CMS 3D CMS Logo

tkal_create_file_lists.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 from __future__ import print_function
4 
5 from builtins import range
6 import os
7 import re
8 import sys
9 import glob
10 import json
11 import math
12 import bisect
13 import random
14 import signal
15 import cPickle
16 import difflib
17 import argparse
18 import functools
19 import itertools
20 import subprocess
21 import collections
22 import multiprocessing
23 import FWCore.PythonUtilities.LumiList as LumiList
24 import Utilities.General.cmssw_das_client as cmssw_das_client
25 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
26 
27 
28 ################################################################################
29 def main(argv = None):
30  """
31  Main routine. Not called, if this module is loaded via `import`.
32 
33  Arguments:
34  - `argv`: Command line arguments passed to the script.
35  """
36 
37  if argv == None:
38  argv = sys.argv[1:]
39 
40  file_list_creator = FileListCreator(argv)
41  file_list_creator.create()
42 
43 
44 ################################################################################
46  """Create file lists for alignment and validation for a given dataset.
47  """
48 
49  def __init__(self, argv):
50  """Constructor taking the command line arguments.
51 
52  Arguments:
53  - `args`: command line arguments
54  """
55 
56  self._first_dataset_ini = True
57  self._parser = self._define_parser()
58  self._args = self._parser.parse_args(argv)
59 
60  if not mps_tools.check_proxy():
61  print_msg(
62  "Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
63  sys.exit(1)
64 
65  self._dataset_regex = re.compile(r"^/([^/]+)/([^/]+)/([^/]+)$")
66  self._validate_input()
67 
68  if self._args.test_mode:
70  import Configuration.PyReleaseValidation.relval_production as rvp
71  self._args.datasets = [rvs.steps[rvp.workflows[1000][1][0]]["INPUT"].dataSet]
72  self._validate_input() # ensure that this change is valid
73 
74  self._datasets = sorted([dataset
75  for pattern in self._args.datasets
76  for dataset in get_datasets(pattern)
77  if re.search(self._args.dataset_filter, dataset)])
78  if len(self._datasets) == 0:
79  print_msg("Found no dataset matching the pattern(s):")
80  for d in self._args.datasets: print_msg("\t"+d)
81  sys.exit(1)
82 
84  [re.sub(self._dataset_regex, r"\1_\2_\3", dataset)
85  for dataset in self._datasets])
86  self._output_dir = os.path.join(self._args.output_dir,
87  self._formatted_dataset)
88  self._output_dir = os.path.abspath(self._output_dir)
92 
93  try:
94  os.makedirs(self._output_dir)
95  except OSError as e:
96  if e.args == (17, "File exists"):
97  if self._args.force:
98  pass # do nothing, just clear the existing output
99  elif self._args.use_cache:
100  self._cache.load() # load cache before clearing the output
101  else:
102  print_msg("Directory '{}' already exists from previous runs"
103  " of the script. Use '--use-cache' if you want to"
104  " use the cached DAS-query results Or use "
105  "'--force' to remove it."
106  .format(self._output_dir))
107  sys.exit(1)
108  files = glob.glob(os.path.join(self._output_dir, "*"))
109  for f in files: os.remove(f)
110  else:
111  raise
112 
113 
114  def create(self):
115  """Creates file list. To be called by user of the class."""
116 
118  self._create_file_lists()
119  self._print_eventcounts()
120  self._write_file_lists()
121 
122 
123  _event_count_log = "event_count_info.log"
124 
125 
126  def _define_parser(self):
127  """Definition of command line argument parser."""
128 
129  parser = argparse.ArgumentParser(
130  description = "Create file lists for alignment",
131  epilog = ("The tool will create a directory containing all file "
132  "lists and a log file with all relevant event counts "
133  "('{}').".format(FileListCreator._event_count_log)))
134  parser.add_argument("-i", "--input", dest = "datasets", required = True,
135  metavar = "DATASET", action = "append",
136  help = ("CMS dataset name; supports wildcards; "
137  "use multiple times for multiple datasets"))
138  parser.add_argument("--dataset-filter", default = "",
139  help = "regex to match within in the datasets matched,"
140  "in case the wildcard isn't flexible enough")
141  parser.add_argument("-j", "--json", dest = "json", metavar = "PATH",
142  help = "path to JSON file (optional)")
143  parser.add_argument("-f", "--fraction", dest = "fraction",
144  type = float, default = 1,
145  help = "max. fraction of files used for alignment")
146  parser.add_argument("--iov", dest = "iovs", metavar = "RUN", type = int,
147  action = "append", default = [],
148  help = ("define IOV by specifying first run; for "
149  "multiple IOVs use this option multiple "
150  "times; files from runs before the lowest "
151  "IOV are discarded (default: 1)"))
152  parser.add_argument("--miniiov", dest="miniiovs", metavar="RUN", type=int,
153  action="append", default=[],
154  help=("in addition to the standard IOVs, break up hippy jobs "
155  "at these points, so that jobs from before and after "
156  "these runs are not in the same job"))
157  parser.add_argument("-r", "--random", action = "store_true",
158  default = False, help = "select files randomly")
159  parser.add_argument("-n", "--events-for-alignment", "--maxevents",
160  dest = "events", type = int, metavar = "NUMBER",
161  help = ("number of events needed for alignment; the"
162  " remaining events in the dataset are used "
163  "for validation; if n<=0, all events are "
164  "used for validation"))
165  parser.add_argument("--all-events", action = "store_true",
166  help = "Use all events for alignment")
167  parser.add_argument("--tracks-for-alignment", dest = "tracks",
168  type = int, metavar = "NUMBER",
169  help = "number of tracks needed for alignment")
170  parser.add_argument("--track-rate", dest = "rate", type = float,
171  metavar = "NUMBER",
172  help = "number of tracks per event")
173  parser.add_argument("--run-by-run", dest = "run_by_run",
174  action = "store_true", default = False,
175  help = "create validation file list for each run")
176  parser.add_argument("--minimum-events-in-iov",
177  dest = "minimum_events_in_iov", metavar = "NUMBER",
178  type = int, default = 100000,
179  help = ("minimum number of events for alignment per"
180  " IOV; this option has a higher priority "
181  "than '-f/--fraction' "
182  "(default: %(default)s)"))
183  parser.add_argument("--minimum-events-validation",
184  dest = "minimum_events_validation",
185  metavar = "NUMBER", type = int, default = 1,
186  help = ("minimum number of events for validation; "
187  "applies to IOVs; in case of --run-by-run "
188  "it applies to runs runs "
189  "(default: %(default)s)"))
190  parser.add_argument("--use-cache", dest = "use_cache",
191  action = "store_true", default = False,
192  help = "use DAS-query results of previous run")
193  parser.add_argument("-o", "--output-dir", dest = "output_dir",
194  metavar = "PATH", default = os.getcwd(),
195  help = "output base directory (default: %(default)s)")
196  parser.add_argument("--create-ini", dest = "create_ini",
197  action = "store_true", default = False,
198  help = ("create dataset ini file based on the "
199  "created file lists"))
200  parser.add_argument("--force", action = "store_true", default = False,
201  help = ("remove output directory from previous "
202  "runs, if existing"))
203  parser.add_argument("--hippy-events-per-job", type = int, default = 1,
204  help = ("approximate number of events in each job for HipPy"))
205  parser.add_argument("--test-mode", dest = "test_mode",
206  action = "store_true", default = False,
207  help = argparse.SUPPRESS) # hidden option
208  return parser
209 
210 
211  def _validate_input(self):
212  """Validate command line arguments."""
213 
214  if self._args.events is None:
215  if self._args.all_events:
216  self._args.events = float("inf")
217  print_msg("Using all tracks for alignment")
218  elif (self._args.tracks is None) and (self._args.rate is None):
219  msg = ("either -n/--events-for-alignment, --all-events, or both of "
220  "--tracks-for-alignment and --track-rate are required")
221  self._parser.error(msg)
222  elif (((self._args.tracks is not None) and (self._args.rate is None)) or
223  ((self._args.rate is not None)and (self._args.tracks is None))):
224  msg = ("--tracks-for-alignment and --track-rate must be used "
225  "together")
226  self._parser.error(msg)
227  else:
228  self._args.events = int(math.ceil(self._args.tracks /
229  self._args.rate))
230  print_msg("Requested {0:d} tracks with {1:.2f} tracks/event "
231  "-> {2:d} events for alignment."
232  .format(self._args.tracks, self._args.rate,
233  self._args.events))
234  else:
235  if (self._args.tracks is not None) or (self._args.rate is not None) or self._args.all_events:
236  msg = ("-n/--events-for-alignment must not be used with "
237  "--tracks-for-alignment, --track-rate, or --all-events")
238  self._parser.error(msg)
239  print_msg("Requested {0:d} events for alignment."
240  .format(self._args.events))
241 
242  for dataset in self._args.datasets:
243  if not re.match(self._dataset_regex, dataset):
244  print_msg("Dataset pattern '"+dataset+"' is not in CMS format.")
245  sys.exit(1)
246 
247  nonzero_events_per_iov = (self._args.minimum_events_in_iov > 0)
248  if nonzero_events_per_iov and self._args.fraction <= 0:
249  print_msg("Setting minimum number of events per IOV for alignment "
250  "to 0 because a non-positive fraction of alignment events"
251  " is chosen: {}".format(self._args.fraction))
252  nonzero_events_per_iov = False
253  self._args.minimum_events_in_iov = 0
254  if nonzero_events_per_iov and self._args.events <= 0:
255  print_msg("Setting minimum number of events per IOV for alignment "
256  "to 0 because a non-positive number of alignment events"
257  " is chosen: {}".format(self._args.events))
258  nonzero_events_per_iov = False
259  self._args.minimum_events_in_iov = 0
260 
261 
263  """Create the needed objects for IOV handling."""
264 
265  self._iovs = sorted(set(self._args.iovs))
266  if len(self._iovs) == 0: self._iovs.append(1)
267  self._iov_info_alignment = {iov: {"events": 0, "files": []}
268  for iov in self._iovs}
269  self._iov_info_validation = {iov: {"events": 0, "files": []}
270  for iov in self._iovs}
271 
272  self._miniiovs = sorted(set(self._iovs) | set(self._args.miniiovs))
273 
274 
275  def _get_iovs(self, runs, useminiiovs=False):
276  """
277  Return the IOV start for `run`. Returns 'None' if the run is before any
278  defined IOV.
279 
280  Arguments:
281  - `runs`: run numbers
282  """
283 
284  iovlist = self._miniiovs if useminiiovs else self._iovs
285 
286  iovs = []
287  for run in runs:
288  iov_index = bisect.bisect(iovlist, run)
289  if iov_index > 0: iovs.append(iovlist[iov_index-1])
290  return iovs
291 
292 
294  """Create the needed objects for run-by-run validation file lists."""
295 
296  self._run_info = {}
297 
298 
299  def _add_file_info(self, container, keys, fileinfo):
300  """Add file with `file_name` to `container` using `key`.
301 
302  Arguments:
303  - `container`: dictionary holding information on files and event counts
304  - `keys`: keys to which the info should be added; will be created if not
305  existing
306  - `file_name`: name of a dataset file
307  """
308 
309  for key in keys:
310  if key not in container:
311  container[key] = {"events": 0,
312  "files": []}
313  container[key]["events"] += fileinfo.nevents / len(keys)
314  if fileinfo not in container[key]["files"]:
315  container[key]["files"].append(fileinfo)
316 
317 
318  def _remove_file_info(self, container, keys, fileinfo):
319  """Remove file with `file_name` to `container` using `key`.
320 
321  Arguments:
322  - `container`: dictionary holding information on files and event counts
323  - `keys`: keys from which the info should be removed
324  - `file_name`: name of a dataset file
325  - `event_count`: number of events in `file_name`
326  """
327 
328  for key in keys:
329  if key not in container: continue
330  try:
331  index = container[key]["files"].index(fileinfo)
332  except ValueError: # file not found
333  return
334  del container[key]["files"][index]
335  container[key]["events"] -= fileinfo.nevents / len(keys)
336 
337 
339  """Retrieve general dataset information and create file list."""
340 
341  if not self._cache.empty:
342  print_msg("Using cached information.")
343  (self._events_in_dataset,
344  self._files,
345  self._file_info,
346  self._max_run) = self._cache.get()
347  self.rereco = any(len(fileinfo.runs)>1 for fileinfo in self._file_info)
348  if self._args.random: random.shuffle(self._files)
349  return
350 
351  # workaround to deal with KeyboardInterrupts in the worker processes:
352  # - ignore interrupt signals in workers (see initializer)
353  # - use a timeout of size sys.maxsize to avoid a bug in multiprocessing
354  number_of_processes = multiprocessing.cpu_count() - 1
355  number_of_processes = (number_of_processes
356  if number_of_processes > 0
357  else 1)
358  pool = multiprocessing.Pool(
359  processes = number_of_processes,
360  initializer = lambda: signal.signal(signal.SIGINT, signal.SIG_IGN))
361 
362  print_msg("Requesting information for the following dataset(s):")
363  for d in self._datasets: print_msg("\t"+d)
364  print_msg("This may take a while...")
365 
366  result = pool.map_async(get_events_per_dataset, self._datasets).get(sys.maxsize)
367  self._events_in_dataset = sum(result)
368 
369  result = pool.map_async(get_max_run, self._datasets).get(sys.maxsize)
370  self._max_run = max(result)
371 
372  result = sum(pool.map_async(get_file_info, self._datasets).get(sys.maxint), [])
373  files = pool.map_async(_make_file_info, result).get(sys.maxint)
374  self._file_info = sorted(fileinfo for fileinfo in files)
375 
376  self.rereco = any(len(fileinfo.runs)>1 for fileinfo in self._file_info)
377 
378  if self._args.test_mode:
379  self._file_info = self._file_info[-200:] # take only last chunk of files
380  self._files = [fileinfo.name for fileinfo in self._file_info]
381 
382  # write information to cache
383  self._cache.set(self._events_in_dataset, self._files, self._file_info,
384  self._max_run)
385  self._cache.dump()
386  if self._args.random:
387  random.shuffle(self._file_info)
388  self._files = [fileinfo.name for fileinfo in self._file_info]
389 
391  """Create file lists for alignment and validation."""
392 
393  # collect files for alignment until minimal requirements are fulfilled
398 
399  max_range = (0
400  if self._args.events <= 0
401  else int(math.ceil(len(self._files)*self._args.fraction)))
402  use_for_alignment = True
403  for i, fileinfo in enumerate(self._file_info):
404  enough_events = self._events_for_alignment >= self._args.events
405  fraction_exceeded = i >= max_range
406  if enough_events or fraction_exceeded: use_for_alignment = False
407 
408  dataset, f, number_of_events, runs = fileinfo
409 
410  iovs = self._get_iovs(runs)
411  if use_for_alignment:
412  if iovs:
413  self._events_for_alignment += number_of_events
414  self._files_alignment.append(fileinfo)
415  self._add_file_info(self._iov_info_alignment, iovs, fileinfo)
416  else:
417  max_range += 1 # not used -> discard in fraction calculation
418  else:
419  if iovs:
420  self._events_for_validation += number_of_events
421  self._files_validation.append(fileinfo)
422  self._add_file_info(self._iov_info_validation, iovs, fileinfo)
423  if self._args.run_by_run:
424  self._add_file_info(self._run_info, runs, fileinfo)
425 
427 
428  self._split_hippy_jobs()
429 
430 
432  """
433  Try to fulfill the requirement on the minimum number of events per IOV
434  in the alignment file list by picking files from the validation list.
435  """
436 
437  for iov in self._iovs:
438  if self._iov_info_alignment[iov]["events"] >= self._args.minimum_events_in_iov: continue
439  for fileinfo in self._files_validation[:]:
440  dataset, f, number_of_events, runs = fileinfo
441  iovs = self._get_iovs(runs)
442  if iov in iovs:
443  self._files_alignment.append(fileinfo)
444  self._events_for_alignment += number_of_events
445  self._add_file_info(self._iov_info_alignment, iovs, fileinfo)
446 
447  self._events_for_validation -= number_of_events
448  self._remove_file_info(self._iov_info_validation, iovs, fileinfo)
449  if self._args.run_by_run:
450  self._remove_file_info(self._run_info, runs, fileinfo)
451  self._files_validation.remove(fileinfo)
452 
453  if (self._iov_info_alignment[iov]["events"]
454  >= self._args.minimum_events_in_iov):
455  break # break the file loop if already enough events
456 
457  def _split_hippy_jobs(self):
458  hippyjobs = {}
459  for dataset, miniiov in itertools.product(self._datasets, self._miniiovs):
460  jobsforminiiov = []
461  hippyjobs[dataset,miniiov] = jobsforminiiov
462  eventsinthisjob = float("inf")
463  for fileinfo in self._files_alignment:
464  if fileinfo.dataset != dataset: continue
465  miniiovs = set(self._get_iovs(fileinfo.runs, useminiiovs=True))
466  if miniiov not in miniiovs: continue
467  if len(miniiovs) > 1:
468  hippyjobs[dataset,miniiov] = []
469  if eventsinthisjob >= self._args.hippy_events_per_job:
470  currentjob = []
471  jobsforminiiov.append(currentjob)
472  eventsinthisjob = 0
473  currentjob.append(fileinfo)
474  currentjob.sort()
475  eventsinthisjob += fileinfo.nevents
476 
477  self._hippy_jobs = {
478  (dataset, iov): sum((hippyjobs[dataset, miniiov]
479  for miniiov in self._miniiovs
480  if iov == max(_ for _ in self._iovs if _ <= miniiov)), []
481  )
482  for dataset, iov in itertools.product(self._datasets, self._iovs)
483  }
484 
486  """Print the event counts per file list and per IOV."""
487 
488  log = os.path.join(self._output_dir, FileListCreator._event_count_log)
489 
490  print_msg("Using {0:d} events for alignment ({1:.2f}%)."
492  100.0*
494  log_file = log)
495  for iov in sorted(self._iov_info_alignment):
496  print_msg(("Approximate events" if self.rereco else "Events") + " for alignment in IOV since {0:d}: {1:d}"
497  .format(iov, self._iov_info_alignment[iov]["events"]),
498  log_file = log)
499 
500  print_msg("Using {0:d} events for validation ({1:.2f}%)."
502  100.0*
504  log_file = log)
505 
506  for iov in sorted(self._iov_info_validation):
507  msg = ("Approximate events" if self.rereco else "Events") + " for validation in IOV since {0:d}: {1:d}".format(
508  iov, self._iov_info_validation[iov]["events"])
509  if (self._iov_info_validation[iov]["events"]
510  < self._args.minimum_events_validation):
511  msg += " (not enough events -> no dataset file will be created)"
512  print_msg(msg, log_file = log)
513 
514  for run in sorted(self._run_info):
515  msg = ("Approximate events" if self.rereco else "Events") + " for validation in run {0:d}: {1:d}".format(
516  run, self._run_info[run]["events"])
517  if (self._run_info[run]["events"]
518  < self._args.minimum_events_validation):
519  msg += " (not enough events -> no dataset file will be created)"
520  print_msg(msg, log_file = log)
521 
522  unused_events = (self._events_in_dataset
524  - self._events_for_alignment)
525  if unused_events > 0 != self._events_in_dataset:
526  print_msg("Unused events: {0:d} ({1:.2f}%)"
527  .format(unused_events,
528  100.0*unused_events/self._events_in_dataset),
529  log_file = log)
530 
531 
532  def _create_dataset_ini_section(self, name, collection, json_file = None):
533  """Write dataset ini snippet.
534 
535  Arguments:
536  - `name`: name of the dataset section
537  - `collection`: track collection of this dataset
538  - `json_file`: JSON file to be used for this dataset (optional)
539  """
540 
541  if json_file:
542  splitted = name.split("_since")
543  file_list = "_since".join(splitted[:-1]
544  if len(splitted) > 1
545  else splitted)
546  else:
547  file_list = name
548  output = "[dataset:{}]\n".format(name)
549  output += "collection = {}\n".format(collection)
550  output += "inputFileList = ${{datasetdir}}/{}.txt\n".format(file_list)
551  output += "json = ${{datasetdir}}/{}\n".format(json_file) if json_file else ""
552 
553  if collection in ("ALCARECOTkAlCosmicsCTF0T",
554  "ALCARECOTkAlCosmicsInCollisions"):
555  if self._first_dataset_ini:
556  print_msg("\tDetermined cosmics dataset, i.e. please replace "
557  "'DUMMY_DECO_MODE_FLAG' and 'DUMMY_ZERO_TESLA_FLAG' "
558  "with the correct values.")
559  self._first_dataset_ini = False
560  output += "cosmicsDecoMode = DUMMY_DECO_MODE_FLAG\n"
561  output += "cosmicsZeroTesla = DUMMY_ZERO_TESLA_FLAG\n"
562  output += "\n"
563 
564  return output
565 
566 
567  def _create_json_file(self, name, first, last = None):
568  """
569  Create JSON file with `name` covering runs from `first` to `last`. If a
570  global JSON is provided, the resulting file is the intersection of the
571  file created here and the global one.
572  Returns the name of the created JSON file.
573 
574  Arguments:
575  - `name`: name of the creted JSON file
576  - `first`: first run covered by the JSON file
577  - `last`: last run covered by the JSON file
578 
579  """
580 
581  if last is None: last = self._max_run
582  name += "_JSON.txt"
583  print_msg("Creating JSON file: "+name)
584 
585  json_file = LumiList.LumiList(runs = range(first, last+1))
586  if self._args.json:
587  global_json = LumiList.LumiList(filename = self._args.json)
588  json_file = json_file & global_json
589  json_file.writeJSON(os.path.join(self._output_dir, name))
590 
591  return name
592 
593 
594  def _get_track_collection(self, edm_file):
595  """Extract track collection from given `edm_file`.
596 
597  Arguments:
598  - `edm_file`: CMSSW dataset file
599  """
600 
601  # use global redirector to allow also files not yet at your site:
602  cmd = ["edmDumpEventContent", r"root://cms-xrd-global.cern.ch/"+edm_file]
603  try:
604  event_content = subprocess.check_output(cmd).split("\n")
605  except subprocess.CalledProcessError as e:
606  splitted = edm_file.split("/")
607  try:
608  alcareco = splitted[splitted.index("ALCARECO")+1].split("-")[0]
609  alcareco = alcareco.replace("TkAlCosmics0T", "TkAlCosmicsCTF0T")
610  alcareco = "ALCARECO" + alcareco
611  print_msg("\tDetermined track collection as '{}'.".format(alcareco))
612  return alcareco
613  except ValueError:
614  if "RECO" in splitted:
615  print_msg("\tDetermined track collection as 'generalTracks'.")
616  return "generalTracks"
617  else:
618  print_msg("\tCould not determine track collection "
619  "automatically.")
620  print_msg("\tPlease replace 'DUMMY_TRACK_COLLECTION' with "
621  "the correct value.")
622  return "DUMMY_TRACK_COLLECTION"
623 
624  track_collections = []
625  for line in event_content:
626  splitted = line.split()
627  if len(splitted) > 0 and splitted[0] == r"vector<reco::Track>":
628  track_collections.append(splitted[1].strip().strip('"'))
629  if len(track_collections) == 0:
630  print_msg("No track collection found in file '{}'.".format(edm_file))
631  sys.exit(1)
632  elif len(track_collections) == 1:
633  print_msg("\tDetermined track collection as "
634  "'{}'.".format(track_collections[0]))
635  return track_collections[0]
636  else:
637  alcareco_tracks = filter(lambda x: x.startswith("ALCARECO"),
638  track_collections)
639  if len(alcareco_tracks) == 0 and "generalTracks" in track_collections:
640  print_msg("\tDetermined track collection as 'generalTracks'.")
641  return "generalTracks"
642  elif len(alcareco_tracks) == 1:
643  print_msg("\tDetermined track collection as "
644  "'{}'.".format(alcareco_tracks[0]))
645  return alcareco_tracks[0]
646  print_msg("\tCould not unambiguously determine track collection in "
647  "file '{}':".format(edm_file))
648  print_msg("\tPlease replace 'DUMMY_TRACK_COLLECTION' with "
649  "the correct value from the following list.")
650  for collection in track_collections:
651  print_msg("\t - "+collection)
652  return "DUMMY_TRACK_COLLECTION"
653 
654 
655  def _write_file_lists(self):
656  """Write file lists to disk."""
657 
659  self._create_hippy_txt(self._formatted_dataset, sum(self._hippy_jobs.values(), []))
660  self._create_dataset_cff(
661  "_".join(["Alignment", self._formatted_dataset]),
662  self._files_alignment)
663 
664  self._create_dataset_cff(
665  "_".join(["Validation", self._formatted_dataset]),
666  self._files_validation)
667 
668 
669  if self._args.create_ini:
670  dataset_ini_general = "[general]\n"
671  dataset_ini_general += "datasetdir = {}\n".format(self._output_dir)
672  dataset_ini_general += ("json = {}\n\n".format(self._args.json)
673  if self._args.json
674  else "\n")
675 
676  ini_path = self._formatted_dataset + ".ini"
677  print_msg("Creating dataset ini file: " + ini_path)
678  ini_path = os.path.join(self._output_dir, ini_path)
679 
680  collection = self._get_track_collection(self._files[0])
681 
682  with open(ini_path, "w") as f:
683  f.write(dataset_ini_general)
684  f.write(self._create_dataset_ini_section(
685  self._formatted_dataset, collection))
686 
687  iov_wise_ini = dataset_ini_general
688 
689  for i,iov in enumerate(sorted(self._iovs)):
690  iov_str = "since{0:d}".format(iov)
691  iov_str = "_".join([self._formatted_dataset, iov_str])
692 
693  if self.rereco:
694  if i == len(self._iovs) - 1:
695  last = None
696  else:
697  last = sorted(self._iovs)[i+1] - 1
698  local_json = self._create_json_file(iov_str, iov, last)
699  else:
700  local_json = None
701 
702  if self._args.create_ini:
703  iov_wise_ini += self._create_dataset_ini_section(iov_str,
704  collection,
705  local_json)
706 
707  self._create_dataset_txt(iov_str,
708  self._iov_info_alignment[iov]["files"])
709  self._create_hippy_txt(iov_str, sum((self._hippy_jobs[dataset,iov] for dataset in self._datasets), []))
710  self._create_dataset_cff(
711  "_".join(["Alignment", iov_str]),
712  self._iov_info_alignment[iov]["files"],
713  json_file=local_json)
714 
715  if (self._iov_info_validation[iov]["events"]
716  < self._args.minimum_events_validation):
717  continue
718  self._create_dataset_cff(
719  "_".join(["Validation", iov_str]),
720  self._iov_info_validation[iov]["files"],
721  json_file=local_json)
722 
723  if self._args.create_ini and iov_wise_ini != dataset_ini_general:
724  ini_path = self._formatted_dataset + "_IOVs.ini"
725  print_msg("Creating dataset ini file: " + ini_path)
726  ini_path = os.path.join(self._output_dir, ini_path)
727  with open(ini_path, "w") as f: f.write(iov_wise_ini)
728 
729  for run in sorted(self._run_info):
730  if args.rereco: continue #need to implement more jsons
731  if (self._run_info[run]["events"]
732  < self._args.minimum_events_validation):
733  continue
734  self._create_dataset_cff(
735  "_".join(["Validation", self._formatted_dataset, str(run)]),
736  self._run_info[run]["files"])
737 
738 
739  def _create_dataset_txt(self, name, file_list):
740  """Write alignment file list to disk.
741 
742  Arguments:
743  - `name`: name of the file list
744  - `file_list`: list of files to write to `name`
745  """
746 
747  name += ".txt"
748  print_msg("Creating dataset file list: "+name)
749  with open(os.path.join(self._output_dir, name), "w") as f:
750  f.write("\n".join(fileinfo.name for fileinfo in file_list))
751 
752 
753  def _create_hippy_txt(self, name, job_list):
754  name += "_hippy.txt"
755  print_msg("Creating dataset file list for HipPy: "+name)
756  with open(os.path.join(self._output_dir, name), "w") as f:
757  f.write("\n".join(",".join("'"+fileinfo.name+"'" for fileinfo in job) for job in job_list)+"\n")
758 
759 
760  def _create_dataset_cff(self, name, file_list, json_file = None):
761  """
762  Create configuration fragment to define a dataset.
763 
764  Arguments:
765  - `name`: name of the configuration fragment
766  - `file_list`: list of files to write to `name`
767  - `json_file`: JSON file to be used for this dataset (optional)
768  """
769 
770  if json_file is None: json_file = self._args.json # might still be None
771  if json_file is not None:
772  json_file = os.path.join(self._output_dir, json_file)
773 
774  name = "_".join(["Dataset",name, "cff.py"])
775  print_msg("Creating dataset configuration fragment: "+name)
776 
777  file_list_str = ""
778  for sub_list in get_chunks(file_list, 255):
779  file_list_str += ("readFiles.extend([\n'"+
780  "',\n'".join(fileinfo.name for fileinfo in sub_list)+
781  "'\n])\n")
782 
783  fragment = FileListCreator._dataset_template.format(
784  lumi_def = ("import FWCore.PythonUtilities.LumiList as LumiList\n\n"
785  "lumiSecs = cms.untracked.VLuminosityBlockRange()\n"
786  "goodLumiSecs = LumiList.LumiList(filename = "
787  "'{0:s}').getCMSSWString().split(',')"
788  .format(json_file)
789  if json_file else ""),
790  lumi_arg = ("lumisToProcess = lumiSecs,\n "
791  if json_file else ""),
792  lumi_extend = "lumiSecs.extend(goodLumiSecs)" if json_file else "",
793  files = file_list_str)
794 
795  with open(os.path.join(self._output_dir, name), "w") as f:
796  f.write(fragment)
797 
798 
799  _dataset_template = """\
800 import FWCore.ParameterSet.Config as cms
801 {lumi_def:s}
802 readFiles = cms.untracked.vstring()
803 source = cms.Source("PoolSource",
804  {lumi_arg:s}fileNames = readFiles)
805 {files:s}{lumi_extend:s}
806 maxEvents = cms.untracked.PSet(input = cms.untracked.int32(-1))
807 """
808 
809 
811  """Helper class to cache information from DAS requests."""
812 
813  def __init__(self, file_list_id):
814  """Constructor of the cache.
815 
816  Arguments:
817  - `file_list_id`: ID of the cached file lists
818  """
819 
820  self._file_list_id = file_list_id
821  self._cache_file_name = os.path.join(file_list_id, ".das_cache.pkl")
822  self.reset()
823 
824 
825  def reset(self):
826  """Reset the cache contents and the 'empty' flag."""
827 
828  self._empty = True
830  self._files = []
831  self._file_info = []
832  self._max_run = None
833 
834 
835  def set(self, total_events, file_list, file_info, max_run):
836  """Set the content of the cache.
837 
838  Arguments:
839  - `total_events`: total number of events in dataset
840  - `file_list`: list of files in dataset
841  - `file_info`: dictionary with numbers of events per file
842  - `max_run`: highest run number contained in the dataset
843  """
844 
845  self._events_in_dataset = total_events
846  self._files = file_list
847  self._file_info = file_info
848  self._max_run = max_run
849  self._empty = False
850 
851 
852  def get(self):
853  """
854  Get the content of the cache as tuple:
855  result = (total number of events in dataset,
856  list of files in dataset,
857  dictionary with numbers of events and runs per file)
858  """
859 
860  return self._events_in_dataset, self._files, self._file_info, self._max_run
861 
862 
863  def load(self):
864  """Loads the cached contents."""
865 
866  if not self.empty:
867  print_msg("Overriding file information with cached information.")
868  try:
869  with open(self._cache_file_name, "rb") as f:
870  tmp_dict = cPickle.load(f)
871  self.__dict__.update(tmp_dict)
872  except IOError as e:
873  if e.args == (2, "No such file or directory"):
874  msg = "Failed to load cache for '{}'.".format(self._file_list_id)
875  if not self.empty:
876  msg += " Keeping the previous file information."
877  print_msg(msg)
878  else:
879  raise
880 
881 
882  def dump(self):
883  """Dumps the contents to the cache file."""
884 
885  if self.empty:
886  print_msg("Cache is empty. Not writing to file.")
887  return
888 
889  with open(self._cache_file_name, "wb") as f:
890  cPickle.dump(self.__dict__, f, 2)
891 
892 
893  @property
894  def empty(self):
895  """
896  Flag indicating whether the cache is empty or has been filled (possibly
897  with nothing).
898  """
899 
900  return self._empty
901 
902 
903 
904 ################################################################################
905 def das_client(query, check_key = None):
906  """
907  Submit `query` to DAS client and handle possible errors.
908  Further treatment of the output might be necessary.
909 
910  Arguments:
911  - `query`: DAS query
912  - `check_key`: optional key to be checked for; retriggers query if needed
913  """
914 
915  error = True
916  for i in range(5): # maximum of 5 tries
917  try:
918  das_data = cmssw_das_client.get_data(query, limit = 0)
919  except IOError as e:
920  if e.errno == 14: #https://stackoverflow.com/q/36397853/5228524
921  continue
922  except ValueError as e:
923  if str(e) == "No JSON object could be decoded":
924  continue
925 
926  if das_data["status"] == "ok":
927  if das_data["nresults"] == 0 or check_key is None:
928  error = False
929  break
930 
931  result_count = 0
932  for d in find_key(das_data["data"], [check_key]):
933  result_count += len(d)
934  if result_count == 0:
935  das_data["status"] = "error"
936  das_data["reason"] = ("DAS did not return required data.")
937  continue
938  else:
939  error = False
940  break
941 
942  if das_data["status"] == "error":
943  print_msg("DAS query '{}' failed 5 times. "
944  "The last time for the the following reason:".format(query))
945  print(das_data["reason"])
946  sys.exit(1)
947  return das_data["data"]
948 
949 
950 def find_key(collection, key_chain):
951  """Searches for `key` in `collection` and returns first corresponding value.
952 
953  Arguments:
954  - `collection`: list of dictionaries
955  - `key_chain`: chain of keys to be searched for
956  """
957 
958  result = None
959  for i,key in enumerate(key_chain):
960  for item in collection:
961  if key in item:
962  if i == len(key_chain) - 1:
963  result = item[key]
964  else:
965  try:
966  result = find_key(item[key], key_chain[i+1:])
967  except LookupError:
968  pass # continue with next `item` in `collection`
969  else:
970  pass # continue with next `item` in `collection`
971 
972  if result is not None: return result
973  raise LookupError(key_chain, collection) # put
974 
975 
976 def print_msg(text, line_break = True, log_file = None):
977  """Formatted printing of `text`.
978 
979  Arguments:
980  - `text`: string to be printed
981  """
982 
983  msg = " >>> " + str(text)
984  if line_break:
985  print(msg)
986  else:
987  print(msg, end=' ')
988  sys.stdout.flush()
989  if log_file:
990  with open(log_file, "a") as f: f.write(msg+"\n")
991  return msg
992 
993 
994 def get_runs(file_name):
995  """
996  Try to guess the run number from `file_name`. If run could not be
997  determined, gets the run numbers from DAS (slow!)
998 
999  Arguments:
1000  - `file_name`: name of the considered file
1001  """
1002  try:
1003  return [int("".join(file_name.split("/")[-4:-2]))]
1004  except ValueError:
1005  query = "run file="+file_name+" system=dbs3"
1006  return [int(_) for _ in find_key(das_client(query), ["run", "run_number"])]
1007 
1008 
1009 def get_max_run(dataset_name):
1010  """Retrieve the maximum run number in `dataset_name`.
1011 
1012  Arguments:
1013  - `dataset_name`: name of the dataset
1014  """
1015 
1016  data = das_client("run dataset={0:s} system=dbs3".format(dataset_name))
1017  runs = [f["run"][0]["run_number"] for f in data]
1018  return max(runs)
1019 
1020 
1021 def get_files(dataset_name):
1022  """Retrieve list of files in `dataset_name`.
1023 
1024  Arguments:
1025  - `dataset_name`: name of the dataset
1026  """
1027 
1028  data = das_client(("file dataset={0:s} system=dbs3 detail=True | "+
1029  "grep file.name, file.nevents > 0").format(dataset_name),
1030  "file")
1031  return [find_key(f["file"], ["name"]) for f in data]
1032 
1033 
1034 def get_datasets(dataset_pattern):
1035  """Retrieve list of dataset matching `dataset_pattern`.
1036 
1037  Arguments:
1038  - `dataset_pattern`: pattern of dataset names
1039  """
1040 
1041  data = das_client("dataset dataset={0:s} system=dbs3 detail=True"
1042  "| grep dataset.name".format(dataset_pattern), "dataset")
1043  return sorted(set([find_key(f["dataset"], ["name"]) for f in data]))
1044 
1045 
1046 def get_events_per_dataset(dataset_name):
1047  """Retrieve the number of a events in `dataset_name`.
1048 
1049  Arguments:
1050  - `dataset_name`: name of a dataset
1051  """
1052 
1053  return _get_events("dataset", dataset_name)
1054 
1055 
1056 def get_events_per_file(file_name):
1057  """Retrieve the number of a events in `file_name`.
1058 
1059  Arguments:
1060  - `file_name`: name of a dataset file
1061  """
1062 
1063  return _get_events("file", file_name)
1064 
1065 
1066 def _get_events(entity, name):
1067  """Retrieve the number of events from `entity` called `name`.
1068 
1069  Arguments:
1070  - `entity`: type of entity
1071  - `name`: name of entity
1072  """
1073 
1074  data = das_client("{0:s}={1:s} system=dbs3 detail=True | grep {0:s}.nevents"
1075  .format(entity, name), entity)
1076  return int(find_key(data, [entity, "nevents"]))
1077 
1078 
1079 def _get_properties(name, entity, properties, filters = None, sub_entity = None,
1080  aggregators = None):
1081  """Retrieve `properties` from `entity` called `name`.
1082 
1083  Arguments:
1084  - `name`: name of entity
1085  - `entity`: type of entity
1086  - `properties`: list of property names
1087  - `filters`: list of filters on properties
1088  - `sub_entity`: type of entity from which to extract the properties;
1089  defaults to `entity`
1090  - `aggregators`: additional aggregators/filters to amend to query
1091  """
1092 
1093  if sub_entity is None: sub_entity = entity
1094  if filters is None: filters = []
1095  props = ["{0:s}.{1:s}".format(sub_entity,prop.split()[0])
1096  for prop in properties]
1097  conditions = ["{0:s}.{1:s}".format(sub_entity, filt)
1098  for filt in filters]
1099  add_ons = "" if aggregators is None else " | "+" | ".join(aggregators)
1100 
1101  data = das_client("{0:s} {1:s}={2:s} system=dbs3 detail=True | grep {3:s}{4:s}"
1102  .format(sub_entity, entity, name,
1103  ", ".join(props+conditions), add_ons), sub_entity)
1104  return [[find_key(f[sub_entity], [prop]) for prop in properties] for f in data]
1105 
1106 def get_file_info(dataset):
1107  result = _get_properties(name=dataset,
1108  properties = ["name", "nevents"],
1109  filters = ["nevents > 0"],
1110  entity = "dataset",
1111  sub_entity = "file")
1112  return [(dataset, name, nevents) for name, nevents in result]
1113 
1114 
1115 
1116 FileInfo = collections.namedtuple("FileInfo", "dataset name nevents runs")
1117 
1118 def _make_file_info(dataset_name_nevents):
1119  return FileInfo(*dataset_name_nevents, runs=get_runs(dataset_name_nevents[1]))
1120 
1121 def get_chunks(long_list, chunk_size):
1122  """
1123  Generates list of sub-lists of `long_list` with a maximum size of
1124  `chunk_size`.
1125 
1126  Arguments:
1127  - `long_list`: original list
1128  - `chunk_size`: maximum size of created sub-lists
1129  """
1130 
1131  for i in range(0, len(long_list), chunk_size):
1132  yield long_list[i:i+chunk_size]
1133 
1134 
1135 def merge_strings(strings):
1136  """Merge strings in `strings` into a common string.
1137 
1138  Arguments:
1139  - `strings`: list of strings
1140  """
1141 
1142  if type(strings) == str:
1143  return strings
1144  elif len(strings) == 0:
1145  return ""
1146  elif len(strings) == 1:
1147  return strings[0]
1148  elif len(strings) == 2:
1149  first = strings[0]
1150  second = strings[1]
1151  else:
1152  first = merge_strings(strings[:-1])
1153  second = strings[-1]
1154 
1155  merged_string = ""
1156  blocks = difflib.SequenceMatcher(None, first, second).get_matching_blocks()
1157 
1158  last_i, last_j, last_n = 0, 0, 0
1159  for i, j, n in blocks:
1160  merged_string += first[last_i+last_n:i]
1161  merged_string += second[last_j+last_n:j]
1162  merged_string += first[i:i+n]
1163  last_i, last_j, last_n = i, j, n
1164 
1165  return str(merged_string)
1166 
1167 
1168 ################################################################################
1169 if __name__ == "__main__":
1170  try:
1171  main()
1172  except KeyboardInterrupt:
1173  pass
def _create_dataset_txt(self, name, file_list)
def _create_dataset_ini_section(self, name, collection, json_file=None)
def set(self, total_events, file_list, file_info, max_run)
bool any(const std::vector< T > &v, const T &what)
Definition: ECalSD.cc:37
def get_events_per_dataset(dataset_name)
def get_chunks(long_list, chunk_size)
def _get_iovs(self, runs, useminiiovs=False)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def print_msg(text, line_break=True, log_file=None)
def _add_file_info(self, container, keys, fileinfo)
def _remove_file_info(self, container, keys, fileinfo)
def _create_json_file(self, name, first, last=None)
def _create_dataset_cff(self, name, file_list, json_file=None)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def _get_properties(name, entity, properties, filters=None, sub_entity=None, aggregators=None)
Definition: main.py:1
def find_key(collection, key_chain)
def get_data(query, limit=None, threshold=None, idx=None, host=None, cmd=None)
def get_datasets(dataset_pattern)
#define str(s)
def _make_file_info(dataset_name_nevents)
def das_client(query, check_key=None)
double split
Definition: MVATrainer.cc:139
T get(const Candidate &c)
Definition: component.h:55