3 from __future__
import print_function
5 from builtins
import range
22 import multiprocessing
23 import FWCore.PythonUtilities.LumiList
as LumiList
24 import Utilities.General.cmssw_das_client
as cmssw_das_client
25 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools
as mps_tools
31 Main routine. Not called, if this module is loaded via `import`. 34 - `argv`: Command line arguments passed to the script. 41 file_list_creator.create()
46 """Create file lists for alignment and validation for a given dataset. 50 """Constructor taking the command line arguments. 53 - `args`: command line arguments 58 self.
_args = self._parser.parse_args(argv)
60 if not mps_tools.check_proxy():
62 "Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
68 if self._args.test_mode:
70 import Configuration.PyReleaseValidation.relval_production
as rvp
71 self._args.datasets = [rvs.steps[rvp.workflows[1000][1][0]][
"INPUT"].dataSet]
75 for pattern
in self._args.datasets
77 if re.search(self._args.dataset_filter, dataset)])
79 print_msg(
"Found no dataset matching the pattern(s):")
80 for d
in self._args.datasets:
print_msg(
"\t"+d)
96 if e.args == (17,
"File exists"):
99 elif self._args.use_cache:
102 print_msg(
"Directory '{}' already exists from previous runs" 103 " of the script. Use '--use-cache' if you want to" 104 " use the cached DAS-query results Or use " 105 "'--force' to remove it." 108 files = glob.glob(os.path.join(self.
_output_dir,
"*"))
109 for f
in files: os.remove(f)
115 """Creates file list. To be called by user of the class.""" 123 _event_count_log =
"event_count_info.log" 127 """Definition of command line argument parser.""" 129 parser = argparse.ArgumentParser(
130 description =
"Create file lists for alignment",
131 epilog = (
"The tool will create a directory containing all file " 132 "lists and a log file with all relevant event counts " 133 "('{}').".
format(FileListCreator._event_count_log)))
134 parser.add_argument(
"-i",
"--input", dest =
"datasets", required =
True,
135 metavar =
"DATASET", action =
"append",
136 help = (
"CMS dataset name; supports wildcards; " 137 "use multiple times for multiple datasets"))
138 parser.add_argument(
"--dataset-filter", default =
"",
139 help =
"regex to match within in the datasets matched," 140 "in case the wildcard isn't flexible enough")
141 parser.add_argument(
"-j",
"--json", dest =
"json", metavar =
"PATH",
142 help =
"path to JSON file (optional)")
143 parser.add_argument(
"-f",
"--fraction", dest =
"fraction",
144 type = float, default = 1,
145 help =
"max. fraction of files used for alignment")
146 parser.add_argument(
"--iov", dest =
"iovs", metavar =
"RUN", type = int,
147 action =
"append", default = [],
148 help = (
"define IOV by specifying first run; for " 149 "multiple IOVs use this option multiple " 150 "times; files from runs before the lowest " 151 "IOV are discarded (default: 1)"))
152 parser.add_argument(
"--miniiov", dest=
"miniiovs", metavar=
"RUN", type=int,
153 action=
"append", default=[],
154 help=(
"in addition to the standard IOVs, break up hippy jobs " 155 "at these points, so that jobs from before and after " 156 "these runs are not in the same job"))
157 parser.add_argument(
"-r",
"--random", action =
"store_true",
158 default =
False, help =
"select files randomly")
159 parser.add_argument(
"-n",
"--events-for-alignment",
"--maxevents",
160 dest =
"events", type = int, metavar =
"NUMBER",
161 help = (
"number of events needed for alignment; the" 162 " remaining events in the dataset are used " 163 "for validation; if n<=0, all events are " 164 "used for validation"))
165 parser.add_argument(
"--all-events", action =
"store_true",
166 help =
"Use all events for alignment")
167 parser.add_argument(
"--tracks-for-alignment", dest =
"tracks",
168 type = int, metavar =
"NUMBER",
169 help =
"number of tracks needed for alignment")
170 parser.add_argument(
"--track-rate", dest =
"rate", type = float,
172 help =
"number of tracks per event")
173 parser.add_argument(
"--run-by-run", dest =
"run_by_run",
174 action =
"store_true", default =
False,
175 help =
"create validation file list for each run")
176 parser.add_argument(
"--minimum-events-in-iov",
177 dest =
"minimum_events_in_iov", metavar =
"NUMBER",
178 type = int, default = 100000,
179 help = (
"minimum number of events for alignment per" 180 " IOV; this option has a higher priority " 181 "than '-f/--fraction' " 182 "(default: %(default)s)"))
183 parser.add_argument(
"--minimum-events-validation",
184 dest =
"minimum_events_validation",
185 metavar =
"NUMBER", type = int, default = 1,
186 help = (
"minimum number of events for validation; " 187 "applies to IOVs; in case of --run-by-run " 188 "it applies to runs runs " 189 "(default: %(default)s)"))
190 parser.add_argument(
"--use-cache", dest =
"use_cache",
191 action =
"store_true", default =
False,
192 help =
"use DAS-query results of previous run")
193 parser.add_argument(
"-o",
"--output-dir", dest =
"output_dir",
194 metavar =
"PATH", default = os.getcwd(),
195 help =
"output base directory (default: %(default)s)")
196 parser.add_argument(
"--create-ini", dest =
"create_ini",
197 action =
"store_true", default =
False,
198 help = (
"create dataset ini file based on the " 199 "created file lists"))
200 parser.add_argument(
"--force", action =
"store_true", default =
False,
201 help = (
"remove output directory from previous " 202 "runs, if existing"))
203 parser.add_argument(
"--hippy-events-per-job", type = int, default = 1,
204 help = (
"approximate number of events in each job for HipPy"))
205 parser.add_argument(
"--test-mode", dest =
"test_mode",
206 action =
"store_true", default =
False,
207 help = argparse.SUPPRESS)
212 """Validate command line arguments.""" 214 if self._args.events
is None:
215 if self._args.all_events:
216 self._args.events =
float(
"inf")
217 print_msg(
"Using all tracks for alignment")
218 elif (self._args.tracks
is None)
and (self._args.rate
is None):
219 msg = (
"either -n/--events-for-alignment, --all-events, or both of " 220 "--tracks-for-alignment and --track-rate are required")
221 self._parser.error(msg)
222 elif (((self._args.tracks
is not None)
and (self._args.rate
is None))
or 223 ((self._args.rate
is not None)
and (self._args.tracks
is None))):
224 msg = (
"--tracks-for-alignment and --track-rate must be used " 226 self._parser.error(msg)
228 self._args.events =
int(math.ceil(self._args.tracks /
230 print_msg(
"Requested {0:d} tracks with {1:.2f} tracks/event " 231 "-> {2:d} events for alignment." 232 .
format(self._args.tracks, self._args.rate,
235 if (self._args.tracks
is not None)
or (self._args.rate
is not None)
or self._args.all_events:
236 msg = (
"-n/--events-for-alignment must not be used with " 237 "--tracks-for-alignment, --track-rate, or --all-events")
238 self._parser.error(msg)
239 print_msg(
"Requested {0:d} events for alignment." 240 .
format(self._args.events))
242 for dataset
in self._args.datasets:
244 print_msg(
"Dataset pattern '"+dataset+
"' is not in CMS format.")
247 nonzero_events_per_iov = (self._args.minimum_events_in_iov > 0)
248 if nonzero_events_per_iov
and self._args.fraction <= 0:
249 print_msg(
"Setting minimum number of events per IOV for alignment " 250 "to 0 because a non-positive fraction of alignment events" 251 " is chosen: {}".
format(self._args.fraction))
252 nonzero_events_per_iov =
False 253 self._args.minimum_events_in_iov = 0
254 if nonzero_events_per_iov
and self._args.events <= 0:
255 print_msg(
"Setting minimum number of events per IOV for alignment " 256 "to 0 because a non-positive number of alignment events" 257 " is chosen: {}".
format(self._args.events))
258 nonzero_events_per_iov =
False 259 self._args.minimum_events_in_iov = 0
263 """Create the needed objects for IOV handling.""" 265 self.
_iovs = sorted(set(self._args.iovs))
266 if len(self.
_iovs) == 0: self._iovs.append(1)
268 for iov
in self.
_iovs}
270 for iov
in self.
_iovs}
277 Return the IOV start for `run`. Returns 'None' if the run is before any 281 - `runs`: run numbers 288 iov_index = bisect.bisect(iovlist, run)
289 if iov_index > 0: iovs.append(iovlist[iov_index-1])
294 """Create the needed objects for run-by-run validation file lists.""" 300 """Add file with `file_name` to `container` using `key`. 303 - `container`: dictionary holding information on files and event counts 304 - `keys`: keys to which the info should be added; will be created if not 306 - `file_name`: name of a dataset file 310 if key
not in container:
311 container[key] = {
"events": 0,
313 container[key][
"events"] += fileinfo.nevents / len(keys)
314 if fileinfo
not in container[key][
"files"]:
315 container[key][
"files"].
append(fileinfo)
319 """Remove file with `file_name` to `container` using `key`. 322 - `container`: dictionary holding information on files and event counts 323 - `keys`: keys from which the info should be removed 324 - `file_name`: name of a dataset file 325 - `event_count`: number of events in `file_name` 329 if key
not in container:
continue 331 index = container[key][
"files"].
index(fileinfo)
334 del container[key][
"files"][index]
335 container[key][
"events"] -= fileinfo.nevents / len(keys)
339 """Retrieve general dataset information and create file list.""" 341 if not self._cache.empty:
348 if self._args.random: random.shuffle(self.
_files)
354 number_of_processes = multiprocessing.cpu_count() - 1
355 number_of_processes = (number_of_processes
356 if number_of_processes > 0
358 pool = multiprocessing.Pool(
359 processes = number_of_processes,
360 initializer =
lambda: signal.signal(signal.SIGINT, signal.SIG_IGN))
362 print_msg(
"Requesting information for the following dataset(s):")
366 result = pool.map_async(get_events_per_dataset, self.
_datasets).
get(sys.maxsize)
369 result = pool.map_async(get_max_run, self.
_datasets).
get(sys.maxsize)
372 result = sum(pool.map_async(get_file_info, self.
_datasets).
get(sys.maxint), [])
373 files = pool.map_async(_make_file_info, result).
get(sys.maxint)
378 if self._args.test_mode:
386 if self._args.random:
391 """Create file lists for alignment and validation.""" 400 if self._args.events <= 0
401 else int(math.ceil(len(self.
_files)*self._args.fraction)))
402 use_for_alignment =
True 403 for i, fileinfo
in enumerate(self.
_file_info):
405 fraction_exceeded = i >= max_range
406 if enough_events
or fraction_exceeded: use_for_alignment =
False 408 dataset, f, number_of_events, runs = fileinfo
411 if use_for_alignment:
414 self._files_alignment.append(fileinfo)
421 self._files_validation.append(fileinfo)
423 if self._args.run_by_run:
433 Try to fulfill the requirement on the minimum number of events per IOV 434 in the alignment file list by picking files from the validation list. 437 for iov
in self.
_iovs:
440 dataset, f, number_of_events, runs = fileinfo
443 self._files_alignment.append(fileinfo)
449 if self._args.run_by_run:
451 self._files_validation.remove(fileinfo)
454 >= self._args.minimum_events_in_iov):
461 hippyjobs[dataset,miniiov] = jobsforminiiov
462 eventsinthisjob =
float(
"inf")
464 if fileinfo.dataset != dataset:
continue 465 miniiovs = set(self.
_get_iovs(fileinfo.runs, useminiiovs=
True))
466 if miniiov
not in miniiovs:
continue 467 if len(miniiovs) > 1:
468 hippyjobs[dataset,miniiov] = []
469 if eventsinthisjob >= self._args.hippy_events_per_job:
471 jobsforminiiov.append(currentjob)
473 currentjob.append(fileinfo)
475 eventsinthisjob += fileinfo.nevents
478 (dataset, iov): sum((hippyjobs[dataset, miniiov]
480 if iov ==
max(_
for _
in self.
_iovs if _ <= miniiov)), []
486 """Print the event counts per file list and per IOV.""" 488 log = os.path.join(self.
_output_dir, FileListCreator._event_count_log)
490 print_msg(
"Using {0:d} events for alignment ({1:.2f}%)." 496 print_msg((
"Approximate events" if self.
rereco else "Events") +
" for alignment in IOV since {0:d}: {1:d}" 500 print_msg(
"Using {0:d} events for validation ({1:.2f}%)." 507 msg = (
"Approximate events" if self.
rereco else "Events") +
" for validation in IOV since {0:d}: {1:d}".
format(
510 < self._args.minimum_events_validation):
511 msg +=
" (not enough events -> no dataset file will be created)" 515 msg = (
"Approximate events" if self.
rereco else "Events") +
" for validation in run {0:d}: {1:d}".
format(
518 < self._args.minimum_events_validation):
519 msg +=
" (not enough events -> no dataset file will be created)" 526 print_msg(
"Unused events: {0:d} ({1:.2f}%)" 533 """Write dataset ini snippet. 536 - `name`: name of the dataset section 537 - `collection`: track collection of this dataset 538 - `json_file`: JSON file to be used for this dataset (optional) 542 splitted = name.split(
"_since")
543 file_list =
"_since".
join(splitted[:-1]
548 output =
"[dataset:{}]\n".
format(name)
549 output +=
"collection = {}\n".
format(collection)
550 output +=
"inputFileList = ${{datasetdir}}/{}.txt\n".
format(file_list)
551 output +=
"json = ${{datasetdir}}/{}\n".
format(json_file)
if json_file
else "" 553 if collection
in (
"ALCARECOTkAlCosmicsCTF0T",
554 "ALCARECOTkAlCosmicsInCollisions"):
556 print_msg(
"\tDetermined cosmics dataset, i.e. please replace " 557 "'DUMMY_DECO_MODE_FLAG' and 'DUMMY_ZERO_TESLA_FLAG' " 558 "with the correct values.")
560 output +=
"cosmicsDecoMode = DUMMY_DECO_MODE_FLAG\n" 561 output +=
"cosmicsZeroTesla = DUMMY_ZERO_TESLA_FLAG\n" 569 Create JSON file with `name` covering runs from `first` to `last`. If a 570 global JSON is provided, the resulting file is the intersection of the 571 file created here and the global one. 572 Returns the name of the created JSON file. 575 - `name`: name of the creted JSON file 576 - `first`: first run covered by the JSON file 577 - `last`: last run covered by the JSON file 581 if last
is None: last = self.
_max_run 588 json_file = json_file & global_json
589 json_file.writeJSON(os.path.join(self.
_output_dir, name))
595 """Extract track collection from given `edm_file`. 598 - `edm_file`: CMSSW dataset file 602 cmd = [
"edmDumpEventContent",
r"root://cms-xrd-global.cern.ch/"+edm_file]
604 event_content = subprocess.check_output(cmd).
split(
"\n")
605 except subprocess.CalledProcessError
as e:
606 splitted = edm_file.split(
"/")
608 alcareco = splitted[splitted.index(
"ALCARECO")+1].
split(
"-")[0]
609 alcareco = alcareco.replace(
"TkAlCosmics0T",
"TkAlCosmicsCTF0T")
610 alcareco =
"ALCARECO" + alcareco
614 if "RECO" in splitted:
615 print_msg(
"\tDetermined track collection as 'generalTracks'.")
616 return "generalTracks" 618 print_msg(
"\tCould not determine track collection " 620 print_msg(
"\tPlease replace 'DUMMY_TRACK_COLLECTION' with " 621 "the correct value.")
622 return "DUMMY_TRACK_COLLECTION" 624 track_collections = []
625 for line
in event_content:
626 splitted = line.split()
627 if len(splitted) > 0
and splitted[0] ==
r"vector<reco::Track>":
628 track_collections.append(splitted[1].
strip().
strip(
'"'))
629 if len(track_collections) == 0:
632 elif len(track_collections) == 1:
633 print_msg(
"\tDetermined track collection as " 634 "'{}'.".
format(track_collections[0]))
635 return track_collections[0]
637 alcareco_tracks =
filter(
lambda x: x.startswith(
"ALCARECO"),
639 if len(alcareco_tracks) == 0
and "generalTracks" in track_collections:
640 print_msg(
"\tDetermined track collection as 'generalTracks'.")
641 return "generalTracks" 642 elif len(alcareco_tracks) == 1:
643 print_msg(
"\tDetermined track collection as " 644 "'{}'.".
format(alcareco_tracks[0]))
645 return alcareco_tracks[0]
646 print_msg(
"\tCould not unambiguously determine track collection in " 647 "file '{}':".
format(edm_file))
648 print_msg(
"\tPlease replace 'DUMMY_TRACK_COLLECTION' with " 649 "the correct value from the following list.")
650 for collection
in track_collections:
652 return "DUMMY_TRACK_COLLECTION" 656 """Write file lists to disk.""" 669 if self._args.create_ini:
670 dataset_ini_general =
"[general]\n" 672 dataset_ini_general += (
"json = {}\n\n".
format(self._args.json)
677 print_msg(
"Creating dataset ini file: " + ini_path)
678 ini_path = os.path.join(self.
_output_dir, ini_path)
682 with open(ini_path,
"w")
as f:
683 f.write(dataset_ini_general)
687 iov_wise_ini = dataset_ini_general
689 for i,iov
in enumerate(sorted(self.
_iovs)):
690 iov_str =
"since{0:d}".
format(iov)
694 if i == len(self.
_iovs) - 1:
697 last = sorted(self.
_iovs)[i+1] - 1
702 if self._args.create_ini:
711 "_".
join([
"Alignment", iov_str]),
713 json_file=local_json)
716 < self._args.minimum_events_validation):
719 "_".
join([
"Validation", iov_str]),
721 json_file=local_json)
723 if self._args.create_ini
and iov_wise_ini != dataset_ini_general:
725 print_msg(
"Creating dataset ini file: " + ini_path)
726 ini_path = os.path.join(self.
_output_dir, ini_path)
727 with open(ini_path,
"w")
as f: f.write(iov_wise_ini)
730 if args.rereco:
continue 732 < self._args.minimum_events_validation):
740 """Write alignment file list to disk. 743 - `name`: name of the file list 744 - `file_list`: list of files to write to `name` 748 print_msg(
"Creating dataset file list: "+name)
749 with open(os.path.join(self.
_output_dir, name),
"w")
as f:
750 f.write(
"\n".
join(fileinfo.name
for fileinfo
in file_list))
755 print_msg(
"Creating dataset file list for HipPy: "+name)
756 with open(os.path.join(self.
_output_dir, name),
"w")
as f:
757 f.write(
"\n".
join(
",".
join(
"'"+fileinfo.name+
"'" for fileinfo
in job)
for job
in job_list)+
"\n")
762 Create configuration fragment to define a dataset. 765 - `name`: name of the configuration fragment 766 - `file_list`: list of files to write to `name` 767 - `json_file`: JSON file to be used for this dataset (optional) 770 if json_file
is None: json_file = self._args.json
771 if json_file
is not None:
772 json_file = os.path.join(self.
_output_dir, json_file)
774 name =
"_".
join([
"Dataset",name,
"cff.py"])
775 print_msg(
"Creating dataset configuration fragment: "+name)
779 file_list_str += (
"readFiles.extend([\n'"+
780 "',\n'".
join(fileinfo.name
for fileinfo
in sub_list)+
783 fragment = FileListCreator._dataset_template.format(
784 lumi_def = (
"import FWCore.PythonUtilities.LumiList as LumiList\n\n" 785 "lumiSecs = cms.untracked.VLuminosityBlockRange()\n" 786 "goodLumiSecs = LumiList.LumiList(filename = " 787 "'{0:s}').getCMSSWString().split(',')" 789 if json_file
else ""),
790 lumi_arg = (
"lumisToProcess = lumiSecs,\n " 791 if json_file
else ""),
792 lumi_extend =
"lumiSecs.extend(goodLumiSecs)" if json_file
else "",
793 files = file_list_str)
795 with open(os.path.join(self.
_output_dir, name),
"w")
as f:
799 _dataset_template =
"""\ 800 import FWCore.ParameterSet.Config as cms 802 readFiles = cms.untracked.vstring() 803 source = cms.Source("PoolSource", 804 {lumi_arg:s}fileNames = readFiles) 805 {files:s}{lumi_extend:s} 806 maxEvents = cms.untracked.PSet(input = cms.untracked.int32(-1)) 811 """Helper class to cache information from DAS requests.""" 814 """Constructor of the cache. 817 - `file_list_id`: ID of the cached file lists 826 """Reset the cache contents and the 'empty' flag.""" 835 def set(self, total_events, file_list, file_info, max_run):
836 """Set the content of the cache. 839 - `total_events`: total number of events in dataset 840 - `file_list`: list of files in dataset 841 - `file_info`: dictionary with numbers of events per file 842 - `max_run`: highest run number contained in the dataset 854 Get the content of the cache as tuple: 855 result = (total number of events in dataset, 856 list of files in dataset, 857 dictionary with numbers of events and runs per file) 864 """Loads the cached contents.""" 867 print_msg(
"Overriding file information with cached information.")
870 tmp_dict = cPickle.load(f)
871 self.__dict__.update(tmp_dict)
873 if e.args == (2,
"No such file or directory"):
876 msg +=
" Keeping the previous file information." 883 """Dumps the contents to the cache file.""" 886 print_msg(
"Cache is empty. Not writing to file.")
890 cPickle.dump(self.__dict__, f, 2)
896 Flag indicating whether the cache is empty or has been filled (possibly 907 Submit `query` to DAS client and handle possible errors. 908 Further treatment of the output might be necessary. 912 - `check_key`: optional key to be checked for; retriggers query if needed 922 except ValueError
as e:
923 if str(e) ==
"No JSON object could be decoded":
926 if das_data[
"status"] ==
"ok":
927 if das_data[
"nresults"] == 0
or check_key
is None:
932 for d
in find_key(das_data[
"data"], [check_key]):
933 result_count += len(d)
934 if result_count == 0:
935 das_data[
"status"] =
"error" 936 das_data[
"reason"] = (
"DAS did not return required data.")
942 if das_data[
"status"] ==
"error":
943 print_msg(
"DAS query '{}' failed 5 times. " 944 "The last time for the the following reason:".
format(query))
945 print(das_data[
"reason"])
947 return das_data[
"data"]
951 """Searches for `key` in `collection` and returns first corresponding value. 954 - `collection`: list of dictionaries 955 - `key_chain`: chain of keys to be searched for 959 for i,key
in enumerate(key_chain):
960 for item
in collection:
962 if i == len(key_chain) - 1:
966 result =
find_key(item[key], key_chain[i+1:])
972 if result
is not None:
return result
973 raise LookupError(key_chain, collection)
976 def print_msg(text, line_break = True, log_file = None):
977 """Formatted printing of `text`. 980 - `text`: string to be printed 983 msg =
" >>> " +
str(text)
990 with open(log_file,
"a")
as f: f.write(msg+
"\n")
996 Try to guess the run number from `file_name`. If run could not be 997 determined, gets the run numbers from DAS (slow!) 1000 - `file_name`: name of the considered file 1003 return [
int(
"".
join(file_name.split(
"/")[-4:-2]))]
1005 query =
"run file="+file_name+
" system=dbs3" 1010 """Retrieve the maximum run number in `dataset_name`. 1013 - `dataset_name`: name of the dataset 1017 runs = [f[
"run"][0][
"run_number"]
for f
in data]
1022 """Retrieve list of files in `dataset_name`. 1025 - `dataset_name`: name of the dataset 1028 data =
das_client((
"file dataset={0:s} system=dbs3 detail=True | "+
1029 "grep file.name, file.nevents > 0").
format(dataset_name),
1031 return [
find_key(f[
"file"], [
"name"])
for f
in data]
1035 """Retrieve list of dataset matching `dataset_pattern`. 1038 - `dataset_pattern`: pattern of dataset names 1041 data =
das_client(
"dataset dataset={0:s} system=dbs3 detail=True" 1042 "| grep dataset.name".
format(dataset_pattern),
"dataset")
1043 return sorted(set([
find_key(f[
"dataset"], [
"name"])
for f
in data]))
1047 """Retrieve the number of a events in `dataset_name`. 1050 - `dataset_name`: name of a dataset 1057 """Retrieve the number of a events in `file_name`. 1060 - `file_name`: name of a dataset file 1067 """Retrieve the number of events from `entity` called `name`. 1070 - `entity`: type of entity 1071 - `name`: name of entity 1074 data =
das_client(
"{0:s}={1:s} system=dbs3 detail=True | grep {0:s}.nevents" 1075 .
format(entity, name), entity)
1079 def _get_properties(name, entity, properties, filters = None, sub_entity = None,
1080 aggregators =
None):
1081 """Retrieve `properties` from `entity` called `name`. 1084 - `name`: name of entity 1085 - `entity`: type of entity 1086 - `properties`: list of property names 1087 - `filters`: list of filters on properties 1088 - `sub_entity`: type of entity from which to extract the properties; 1089 defaults to `entity` 1090 - `aggregators`: additional aggregators/filters to amend to query 1093 if sub_entity
is None: sub_entity = entity
1094 if filters
is None: filters = []
1095 props = [
"{0:s}.{1:s}".
format(sub_entity,prop.split()[0])
1096 for prop
in properties]
1097 conditions = [
"{0:s}.{1:s}".
format(sub_entity, filt)
1098 for filt
in filters]
1099 add_ons =
"" if aggregators
is None else " | "+
" | ".
join(aggregators)
1101 data =
das_client(
"{0:s} {1:s}={2:s} system=dbs3 detail=True | grep {3:s}{4:s}" 1102 .
format(sub_entity, entity, name,
1103 ", ".
join(props+conditions), add_ons), sub_entity)
1104 return [[
find_key(f[sub_entity], [prop])
for prop
in properties]
for f
in data]
1108 properties = [
"name",
"nevents"],
1109 filters = [
"nevents > 0"],
1111 sub_entity =
"file")
1112 return [(dataset, name, nevents)
for name, nevents
in result]
1116 FileInfo = collections.namedtuple(
"FileInfo",
"dataset name nevents runs")
1119 return FileInfo(*dataset_name_nevents, runs=
get_runs(dataset_name_nevents[1]))
1123 Generates list of sub-lists of `long_list` with a maximum size of 1127 - `long_list`: original list 1128 - `chunk_size`: maximum size of created sub-lists 1131 for i
in range(0, len(long_list), chunk_size):
1132 yield long_list[i:i+chunk_size]
1136 """Merge strings in `strings` into a common string. 1139 - `strings`: list of strings 1142 if type(strings) == str:
1144 elif len(strings) == 0:
1146 elif len(strings) == 1:
1148 elif len(strings) == 2:
1153 second = strings[-1]
1156 blocks = difflib.SequenceMatcher(
None, first, second).get_matching_blocks()
1158 last_i, last_j, last_n = 0, 0, 0
1159 for i, j, n
in blocks:
1160 merged_string += first[last_i+last_n:i]
1161 merged_string += second[last_j+last_n:j]
1162 merged_string += first[i:i+n]
1163 last_i, last_j, last_n = i, j, n
1165 return str(merged_string)
1169 if __name__ ==
"__main__":
1172 except KeyboardInterrupt:
def _get_track_collection(self, edm_file)
def _validate_input(self)
def _create_dataset_txt(self, name, file_list)
def _create_dataset_ini_section(self, name, collection, json_file=None)
def set(self, total_events, file_list, file_info, max_run)
def _get_events(entity, name)
def __init__(self, file_list_id)
def _prepare_iov_datastructures(self)
bool any(const std::vector< T > &v, const T &what)
def _request_dataset_information(self)
def get_events_per_file(file_name)
def get_events_per_dataset(dataset_name)
def get_chunks(long_list, chunk_size)
def _get_iovs(self, runs, useminiiovs=False)
def _create_file_lists(self)
S & print(S &os, JobReport::InputFile const &f)
def print_msg(text, line_break=True, log_file=None)
def get_max_run(dataset_name)
def _add_file_info(self, container, keys, fileinfo)
def _split_hippy_jobs(self)
def _remove_file_info(self, container, keys, fileinfo)
def _create_json_file(self, name, first, last=None)
def _print_eventcounts(self)
def _create_dataset_cff(self, name, file_list, json_file=None)
def get_file_info(dataset)
def _prepare_run_datastructures(self)
static std::string join(char **cmd)
def merge_strings(strings)
def _write_file_lists(self)
def _get_properties(name, entity, properties, filters=None, sub_entity=None, aggregators=None)
def _fulfill_iov_eventcount(self)
def find_key(collection, key_chain)
def get_data(query, limit=None, threshold=None, idx=None, host=None, cmd=None)
def _create_hippy_txt(self, name, job_list)
def get_datasets(dataset_pattern)
def _make_file_info(dataset_name_nevents)
def das_client(query, check_key=None)
T get(const Candidate &c)
def get_files(dataset_name)