15 """Main program to run all kinds of harvesting. 17 These are the basic kinds of harvesting implemented (contact me if 18 your favourite is missing): 20 - RelVal : Run for release validation samples. Makes heavy use of MC 23 - RelValFS: FastSim RelVal. 25 - MC : Run for MC samples. 27 - DQMOffline : Run for real data (could also be run for MC). 29 For the mappings of these harvesting types to sequence names please 30 see the setup_harvesting_info() and option_handler_list_types() 34 from __future__
import print_function
38 from builtins
import range
39 __version__ =
"3.8.2p1" 40 __author__ =
"Jeroen Hegeman (jeroen.hegeman@cern.ch)," \
41 "Niklas Pietsch (niklas.pietsch@desy.de)" 43 twiki_url =
"https://twiki.cern.ch/twiki/bin/view/CMS/CmsHarvester" 99 from inspect
import getargspec
100 from random
import choice
104 from DBSAPI.dbsApi
import DbsApi
107 from functools
import reduce
110 global SAXParseException
112 from xml.sax
import SAXParseException
114 import Configuration.PyReleaseValidation
115 from Configuration.PyReleaseValidation.ConfigBuilder
import \
116 ConfigBuilder, defaultOptions
135 return repr(self.
msg)
146 return repr(self.
msg)
153 """Helper class to add some customised help output to cmsHarvester. 155 We want to add some instructions, as well as a pointer to the CMS 165 usage_lines.append(sep_line)
166 usage_lines.append(
"Welcome to the CMS harvester, a (hopefully useful)")
167 usage_lines.append(
"tool to create harvesting configurations.")
168 usage_lines.append(
"For more information please have a look at the CMS Twiki:")
169 usage_lines.append(
" %s" % twiki_url)
170 usage_lines.append(sep_line)
171 usage_lines.append(
"")
175 usage_lines.append(optparse.IndentedHelpFormatter. \
178 formatted_usage =
"\n".
join(usage_lines)
179 return formatted_usage
188 """XML handler class to parse DBS results. 190 The tricky thing here is that older DBS versions (2.0.5 and 191 earlier) return results in a different XML format than newer 192 versions. Previously the result values were returned as attributes 193 to the `result' element. The new approach returns result values as 194 contents of named elements. 196 The old approach is handled directly in startElement(), the new 197 approach in characters(). 199 NOTE: All results are returned in the form of string values of 210 "dataset.tag" :
"PROCESSEDDATASET_GLOBALTAG",
211 "datatype.type" :
"PRIMARYDSTYPE_TYPE",
212 "run" :
"RUNS_RUNNUMBER",
213 "run.number" :
"RUNS_RUNNUMBER",
214 "file.name" :
"FILES_LOGICALFILENAME",
215 "file.numevents" :
"FILES_NUMBEROFEVENTS",
216 "algo.version" :
"APPVERSION_VERSION",
217 "site" :
"STORAGEELEMENT_SENAME",
237 key = DBSXMLHandler.mapping[name]
238 value =
str(attrs[key])
248 "closing unopenend element `%s'" % name
271 """Make sure that all results arrays have equal length. 273 We should have received complete rows from DBS. I.e. all 274 results arrays in the handler should be of equal length. 281 if len(res_names) > 1:
282 for res_name
in res_names[1:]:
283 res_tmp = self.
results[res_name]
284 if len(res_tmp) != len(self.
results[res_names[0]]):
285 results_valid =
False 296 """Class to perform CMS harvesting. 298 More documentation `obviously' to follow. 305 "Initialize class and process command line options." 332 "single-step-allow-partial",
425 "dqm/offline/harvesting_output/" 485 if cmd_line_opts
is None:
486 cmd_line_opts = sys.argv[1:]
490 log_handler = logging.StreamHandler()
493 log_formatter = logging.Formatter(
"%(message)s")
494 log_handler.setFormatter(log_formatter)
495 logger = logging.getLogger()
497 logger.addHandler(log_handler)
509 "Clean up after ourselves." 521 "Create a timestamp to use in the created config files." 523 time_now = datetime.datetime.utcnow()
525 time_now = time_now.replace(microsecond = 0)
526 time_stamp =
"%sUTC" % datetime.datetime.isoformat(time_now)
534 "Spit out an identification string for cmsHarvester.py." 536 ident_str =
"`cmsHarvester.py " \
537 "version %s': cmsHarvester.py %s" % \
539 reduce(
lambda x, y: x+
' '+y, sys.argv[1:]))
546 """Create the conditions string needed for `cmsDriver'. 548 Just glueing the FrontierConditions bit in front of it really. 559 if globaltag.lower().
find(
"conditions") > -1:
560 conditions_string = globaltag
562 conditions_string =
"FrontierConditions_GlobalTag,%s" % \
566 return conditions_string
571 """Return the database account name used to store the GlobalTag. 573 The name of the database account depends (albeit weakly) on 574 the CMSSW release version. 580 account_name =
"CMS_COND_31X_GLOBALTAG" 588 """See db_account_name_cms_cond_globaltag.""" 591 version = self.cmssw_version[6:11]
592 if version <
"3_4_0":
593 account_name =
"CMS_COND_31X_DQM_SUMMARY" 595 account_name =
"CMS_COND_34X" 603 "Create a nice header to be used to mark the generated files." 609 tmp.append(
"# %s" % time_stamp)
610 tmp.append(
"# WARNING: This file was created automatically!")
612 tmp.append(
"# Created by %s" % ident_str)
614 header =
"\n".
join(tmp)
622 """Adjust the level of output generated. 625 - normal : default level of output 626 - quiet : less output than the default 627 - verbose : some additional information 628 - debug : lots more information, may be overwhelming 630 NOTE: The debug option is a bit special in the sense that it 631 also modifies the output format. 638 "NORMAL" : logging.INFO,
639 "QUIET" : logging.WARNING,
640 "VERBOSE" : logging.INFO,
641 "DEBUG" : logging.DEBUG
644 output_level = output_level.upper()
652 self.
logger.fatal(
"Unknown output level `%s'" % ouput_level)
661 """Switch to debug mode. 663 This both increases the amount of output generated, as well as 664 changes the format used (more detailed information is given). 669 log_formatter_debug = logging.Formatter(
"[%(levelname)s] " \
679 log_handler = self.
logger.handlers[0]
680 log_handler.setFormatter(log_formatter_debug)
688 "Switch to quiet mode: less verbose." 697 """Switch on `force mode' in which case we don't brake for nobody. 699 In so-called `force mode' all sanity checks are performed but 700 we don't halt on failure. Of course this requires some care 713 """Set the harvesting type to be used. 715 This checks that no harvesting type is already set, and sets 716 the harvesting type to be used to the one specified. If a 717 harvesting type is already set an exception is thrown. The 718 same happens when an unknown type is specified. 727 value = value.lower()
730 type_index = harvesting_types_lowered.index(value)
734 self.
logger.fatal(
"Unknown harvesting type `%s'" % \
736 self.
logger.fatal(
" possible types are: %s" %
738 raise Usage(
"Unknown harvesting type `%s'" % \
744 msg =
"Only one harvesting type should be specified" 749 self.
logger.
info(
"Harvesting type to be used: `%s'" % \
757 """Set the harvesting mode to be used. 759 Single-step harvesting can be used for samples that are 760 located completely at a single site (= SE). Otherwise use 766 harvesting_mode = value.lower()
768 msg =
"Unknown harvesting mode `%s'" % harvesting_mode
770 self.
logger.fatal(
" possible modes are: %s" % \
777 msg =
"Only one harvesting mode should be specified" 782 self.
logger.
info(
"Harvesting mode to be used: `%s'" % \
790 """Set the GlobalTag to be used, overriding our own choices. 792 By default the cmsHarvester will use the GlobalTag with which 793 a given dataset was created also for the harvesting. The 794 --globaltag option is the way to override this behaviour. 800 msg =
"Only one GlobalTag should be specified" 805 self.
logger.
info(
"GlobalTag to be used: `%s'" % \
813 "Switch use of all reference histograms off." 817 self.
logger.
warning(
"Switching off all use of reference histograms")
825 """Override the default Frontier connection string. 827 Please only use this for testing (e.g. when a test payload has 828 been inserted into cms_orc_off instead of cms_orc_on). 830 This method gets called for three different command line 832 - --frontier-connection, 833 - --frontier-connection-for-globaltag, 834 - --frontier-connection-for-refhists. 835 Appropriate care has to be taken to make sure things are only 841 frontier_type = opt_str.split(
"-")[-1]
842 if frontier_type ==
"connection":
846 frontier_types = [frontier_type]
850 for connection_name
in frontier_types:
852 msg =
"Please specify either:\n" \
853 " `--frontier-connection' to change the " \
854 "Frontier connection used for everything, or\n" \
855 "either one or both of\n" \
856 " `--frontier-connection-for-globaltag' to " \
857 "change the Frontier connection used for the " \
859 " `--frontier-connection-for-refhists' to change " \
860 "the Frontier connection used for the " \
861 "reference histograms." 865 frontier_prefix =
"frontier://" 866 if not value.startswith(frontier_prefix):
867 msg =
"Expecting Frontier connections to start with " \
868 "`%s'. You specified `%s'." % \
869 (frontier_prefix, value)
874 if value.find(
"FrontierProd") < 0
and \
875 value.find(
"FrontierProd") < 0:
876 msg =
"Expecting Frontier connections to contain either " \
877 "`FrontierProd' or `FrontierPrep'. You specified " \
878 "`%s'. Are you sure?" % \
882 if not value.endswith(
"/"):
885 for connection_name
in frontier_types:
889 frontier_type_str =
"unknown" 890 if connection_name ==
"globaltag":
891 frontier_type_str =
"the GlobalTag" 892 elif connection_name ==
"refhists":
893 frontier_type_str =
"the reference histograms" 896 "connection for %s " \
934 if opt_str.lower().
find(
"ignore") > -1:
940 if opt_str.lower().
find(
"dataset") > -1:
941 select_type =
"datasets" 945 if not self.
input_method[select_type][spec_type]
is None:
946 msg =
"Please only specify one input method " \
947 "(for the `%s' case)" % opt_str
951 input_method = opt_str.replace(
"-",
"").
replace(
"ignore",
"")
952 self.
input_method[select_type][spec_type] = input_method
953 self.
input_name[select_type][spec_type] = value
955 self.
logger.
debug(
"Input method for the `%s' case: %s" % \
956 (spec_type, input_method))
963 """Store the name of the file to be used for book keeping. 965 The only check done here is that only a single book keeping 973 msg =
"Only one book keeping file should be specified" 978 self.
logger.
info(
"Book keeping file to be used: `%s'" % \
986 """Store the name of the file for the ref. histogram mapping. 993 msg =
"Only one reference histogram mapping file " \
994 "should be specified" 999 self.
logger.
info(
"Reference histogram mapping file " \
1000 "to be used: `%s'" % \
1062 """Specify where on CASTOR the output should go. 1064 At the moment only output to CERN CASTOR is 1065 supported. Eventually the harvested results should go into the 1066 central place for DQM on CASTOR anyway. 1073 castor_prefix = self.castor_prefix
1076 castor_dir = os.path.join(os.path.sep, castor_dir)
1077 self.castor_base_dir = os.path.normpath(castor_dir)
1079 self.logger.
info(
"CASTOR (base) area to be used: `%s'" % \
1080 self.castor_base_dir)
1087 """Set the self.no_t1access flag to try and create jobs that 1088 run without special `t1access' role. 1092 self.non_t1access =
True 1094 self.logger.
warning(
"Running in `non-t1access' mode. " \
1095 "Will try to create jobs that run " \
1096 "without special rights but no " \
1097 "further promises...")
1104 """Set the self.caf_access flag to try and create jobs that 1108 self.caf_access =
True 1110 self.logger.
warning(
"Running in `caf_access' mode. " \
1111 "Will try to create jobs that run " \
1113 "further promises...")
1120 """Set process.dqmSaver.saveByLumiSectiont=1 in cfg harvesting file 1122 self.saveByLumiSection =
True 1124 self.logger.
warning(
"waning concerning saveByLumiSection option")
1132 """Crab jobs are not created and 1133 "submitted automatically", 1135 self.crab_submission =
True 1143 self.nr_max_sites = value
1149 self.preferred_site = value
1154 """List all harvesting types and their mappings. 1156 This lists all implemented harvesting types with their 1157 corresponding mappings to sequence names. This had to be 1158 separated out from the help since it depends on the CMSSW 1159 version and was making things a bit of a mess. 1161 NOTE: There is no way (at least not that I could come up with) 1162 to code this in a neat generic way that can be read both by 1163 this method and by setup_harvesting_info(). Please try hard to 1164 keep these two methods in sync! 1169 sep_line_short =
"-" * 20
1172 print(
"The following harvesting types are available:")
1175 print(
"`RelVal' maps to:")
1176 print(
" pre-3_3_0 : HARVESTING:validationHarvesting")
1177 print(
" 3_4_0_pre2 and later: HARVESTING:validationHarvesting+dqmHarvesting")
1178 print(
" Exceptions:")
1179 print(
" 3_3_0_pre1-4 : HARVESTING:validationHarvesting")
1180 print(
" 3_3_0_pre6 : HARVESTING:validationHarvesting")
1181 print(
" 3_4_0_pre1 : HARVESTING:validationHarvesting")
1183 print(sep_line_short)
1185 print(
"`RelValFS' maps to:")
1186 print(
" always : HARVESTING:validationHarvestingFS")
1188 print(sep_line_short)
1190 print(
"`MC' maps to:")
1191 print(
" always : HARVESTING:validationprodHarvesting")
1193 print(sep_line_short)
1195 print(
"`DQMOffline' maps to:")
1196 print(
" always : HARVESTING:dqmHarvesting")
1209 """Fill our dictionary with all info needed to understand 1212 This depends on the CMSSW version since at some point the 1213 names and sequences were modified. 1215 NOTE: There is no way (at least not that I could come up with) 1216 to code this in a neat generic way that can be read both by 1217 this method and by option_handler_list_types(). Please try 1218 hard to keep these two methods in sync! 1222 assert not self.cmssw_version
is None, \
1223 "ERROR setup_harvesting() requires " \
1224 "self.cmssw_version to be set!!!" 1226 harvesting_info = {}
1229 harvesting_info[
"DQMOffline"] = {}
1230 harvesting_info[
"DQMOffline"][
"beamspot"] =
None 1231 harvesting_info[
"DQMOffline"][
"eventcontent"] =
None 1232 harvesting_info[
"DQMOffline"][
"harvesting"] =
"AtRunEnd" 1234 harvesting_info[
"RelVal"] = {}
1235 harvesting_info[
"RelVal"][
"beamspot"] =
None 1236 harvesting_info[
"RelVal"][
"eventcontent"] =
None 1237 harvesting_info[
"RelVal"][
"harvesting"] =
"AtRunEnd" 1239 harvesting_info[
"RelValFS"] = {}
1240 harvesting_info[
"RelValFS"][
"beamspot"] =
None 1241 harvesting_info[
"RelValFS"][
"eventcontent"] =
None 1242 harvesting_info[
"RelValFS"][
"harvesting"] =
"AtRunEnd" 1244 harvesting_info[
"MC"] = {}
1245 harvesting_info[
"MC"][
"beamspot"] =
None 1246 harvesting_info[
"MC"][
"eventcontent"] =
None 1247 harvesting_info[
"MC"][
"harvesting"] =
"AtRunEnd" 1257 assert self.cmssw_version.startswith(
"CMSSW_")
1260 version = self.cmssw_version[6:]
1266 if version <
"3_3_0":
1267 step_string =
"validationHarvesting" 1268 elif version
in [
"3_3_0_pre1",
"3_3_0_pre2",
1269 "3_3_0_pre3",
"3_3_0_pre4",
1270 "3_3_0_pre6",
"3_4_0_pre1"]:
1271 step_string =
"validationHarvesting" 1273 step_string =
"validationHarvesting+dqmHarvesting" 1275 harvesting_info[
"RelVal"][
"step_string"] = step_string
1279 assert not step_string
is None, \
1280 "ERROR Could not decide a RelVal harvesting sequence " \
1281 "for CMSSW version %s" % self.cmssw_version
1287 step_string =
"validationHarvestingFS" 1289 harvesting_info[
"RelValFS"][
"step_string"] = step_string
1294 step_string =
"validationprodHarvesting" 1296 harvesting_info[
"MC"][
"step_string"] = step_string
1300 assert not step_string
is None, \
1301 "ERROR Could not decide a MC harvesting " \
1302 "sequence for CMSSW version %s" % self.cmssw_version
1308 step_string =
"dqmHarvesting" 1310 harvesting_info[
"DQMOffline"][
"step_string"] = step_string
1314 self.harvesting_info = harvesting_info
1316 self.logger.
info(
"Based on the CMSSW version (%s) " \
1317 "I decided to use the `HARVESTING:%s' " \
1318 "sequence for %s harvesting" % \
1319 (self.cmssw_version,
1320 self.harvesting_info[self.harvesting_type][
"step_string"],
1321 self.harvesting_type))
1328 """Build the common part of the output path to be used on 1331 This consists of the CASTOR area base path specified by the 1332 user and a piece depending on the data type (data vs. MC), the 1333 harvesting type and the dataset name followed by a piece 1334 containing the run number and event count. (See comments in 1335 create_castor_path_name_special for details.) This method 1336 creates the common part, without run number and event count. 1340 castor_path = self.castor_base_dir
1345 datatype = self.datasets_information[dataset_name][
"datatype"]
1346 datatype = datatype.lower()
1347 castor_path = os.path.join(castor_path, datatype)
1350 harvesting_type = self.harvesting_type
1351 harvesting_type = harvesting_type.lower()
1352 castor_path = os.path.join(castor_path, harvesting_type)
1362 release_version = self.cmssw_version
1363 release_version = release_version.lower(). \
1366 castor_path = os.path.join(castor_path, release_version)
1369 dataset_name_escaped = self.escape_dataset_name(dataset_name)
1370 castor_path = os.path.join(castor_path, dataset_name_escaped)
1374 castor_path = os.path.normpath(castor_path)
1382 dataset_name, run_number,
1383 castor_path_common):
1384 """Create the specialised part of the CASTOR output dir name. 1386 NOTE: To avoid clashes with `incremental harvesting' 1387 (re-harvesting when a dataset grows) we have to include the 1388 event count in the path name. The underlying `problem' is that 1389 CRAB does not overwrite existing output files so if the output 1390 file already exists CRAB will fail to copy back the output. 1392 NOTE: It's not possible to create different kinds of 1393 harvesting jobs in a single call to this tool. However, in 1394 principle it could be possible to create both data and MC jobs 1397 NOTE: The number of events used in the path name is the 1398 _total_ number of events in the dataset/run at the time of 1399 harvesting. If we're doing partial harvesting the final 1400 results will reflect lower statistics. This is a) the easiest 1401 to code and b) the least likely to lead to confusion if 1402 someone ever decides to swap/copy around file blocks between 1407 castor_path = castor_path_common
1412 castor_path = os.path.join(castor_path,
"run_%d" % run_number)
1420 castor_path = os.path.join(castor_path,
"nevents")
1424 castor_path = os.path.normpath(castor_path)
1432 """Make sure all required CASTOR output dirs exist. 1434 This checks the CASTOR base dir specified by the user as well 1435 as all the subdirs required by the current set of jobs. 1439 self.logger.
info(
"Checking (and if necessary creating) CASTOR " \
1440 "output area(s)...")
1443 self.create_and_check_castor_dir(self.castor_base_dir)
1447 for (dataset_name, runs)
in self.datasets_to_use.
items():
1450 castor_dirs.append(self.datasets_information[dataset_name] \
1451 [
"castor_path"][run])
1452 castor_dirs_unique = sorted(set(castor_dirs))
1456 ndirs = len(castor_dirs_unique)
1457 step =
max(ndirs / 10, 1)
1458 for (i, castor_dir)
in enumerate(castor_dirs_unique):
1459 if (i + 1) % step == 0
or \
1461 self.logger.
info(
" %d/%d" % \
1463 self.create_and_check_castor_dir(castor_dir)
1470 self.logger.
debug(
"Checking if path `%s' is empty" % \
1472 cmd =
"rfdir %s" % castor_dir
1473 (status, output) = subprocess.getstatusoutput(cmd)
1475 msg =
"Could not access directory `%s'" \
1476 " !!! This is bad since I should have just" \
1477 " created it !!!" % castor_dir
1478 self.logger.fatal(msg)
1481 self.logger.
warning(
"Output directory `%s' is not empty:" \
1482 " new jobs will fail to" \
1483 " copy back output" % \
1491 """Check existence of the give CASTOR dir, if necessary create 1494 Some special care has to be taken with several things like 1495 setting the correct permissions such that CRAB can store the 1496 output results. Of course this means that things like 1497 /castor/cern.ch/ and user/j/ have to be recognised and treated 1500 NOTE: Only CERN CASTOR area (/castor/cern.ch/) supported for 1503 NOTE: This method uses some slightly tricky caching to make 1504 sure we don't keep over and over checking the same base paths. 1511 def split_completely(path):
1512 (parent_path, name) = os.path.split(path)
1514 return (parent_path, )
1516 return split_completely(parent_path) + (name, )
1522 def extract_permissions(rfstat_output):
1523 """Parse the output from rfstat and return the 1524 5-digit permissions string.""" 1526 permissions_line = [i
for i
in output.split(
"\n") \
1527 if i.lower().
find(
"protection") > -1]
1528 regexp = re.compile(
".*\(([0123456789]{5})\).*")
1529 match = regexp.search(rfstat_output)
1530 if not match
or len(match.groups()) != 1:
1531 msg =
"Could not extract permissions " \
1532 "from output: %s" % rfstat_output
1533 self.logger.fatal(msg)
1535 permissions = match.group(1)
1552 castor_paths_dont_touch = {
1553 0: [
"/",
"castor",
"cern.ch",
"cms",
"store",
"temp",
1554 "dqm",
"offline",
"user"],
1555 -1: [
"user",
"store"]
1558 self.logger.
debug(
"Checking CASTOR path `%s'" % castor_dir)
1563 castor_path_pieces = split_completely(castor_dir)
1569 check_sizes = sorted(castor_paths_dont_touch.keys())
1570 len_castor_path_pieces = len(castor_path_pieces)
1571 for piece_index
in range (len_castor_path_pieces):
1572 skip_this_path_piece =
False 1573 piece = castor_path_pieces[piece_index]
1576 for check_size
in check_sizes:
1578 if (piece_index + check_size) > -1:
1582 if castor_path_pieces[piece_index + check_size]
in castor_paths_dont_touch[check_size]:
1584 skip_this_path_piece =
True 1592 path = os.path.join(path, piece)
1599 if path
in self.castor_path_checks_cache:
1601 except AttributeError:
1603 self.castor_path_checks_cache = []
1604 self.castor_path_checks_cache.
append(path)
1622 if not skip_this_path_piece:
1630 self.logger.
debug(
"Checking if path `%s' exists" % \
1632 cmd =
"rfstat %s" % path
1633 (status, output) = subprocess.getstatusoutput(cmd)
1636 self.logger.
debug(
"Creating path `%s'" % path)
1637 cmd =
"nsmkdir -m 775 %s" % path
1638 (status, output) = subprocess.getstatusoutput(cmd)
1640 msg =
"Could not create directory `%s'" % path
1641 self.logger.fatal(msg)
1643 cmd =
"rfstat %s" % path
1644 (status, output) = subprocess.getstatusoutput(cmd)
1649 permissions = extract_permissions(output)
1650 if not permissions.startswith(
"40"):
1651 msg =
"Path `%s' is not a directory(?)" % path
1652 self.logger.fatal(msg)
1657 self.logger.
debug(
"Checking permissions for path `%s'" % path)
1658 cmd =
"rfstat %s" % path
1659 (status, output) = subprocess.getstatusoutput(cmd)
1661 msg =
"Could not obtain permissions for directory `%s'" % \
1663 self.logger.fatal(msg)
1666 permissions = extract_permissions(output)[-3:]
1670 if piece_index == (len_castor_path_pieces - 1):
1673 permissions_target =
"775" 1676 permissions_target =
"775" 1679 permissions_new = []
1680 for (i, j)
in zip(permissions, permissions_target):
1682 permissions_new =
"".
join(permissions_new)
1683 self.logger.
debug(
" current permissions: %s" % \
1685 self.logger.
debug(
" target permissions : %s" % \
1687 if permissions_new != permissions:
1689 self.logger.
debug(
"Changing permissions of `%s' " \
1690 "to %s (were %s)" % \
1691 (path, permissions_new, permissions))
1692 cmd =
"rfchmod %s %s" % (permissions_new, path)
1693 (status, output) = subprocess.getstatusoutput(cmd)
1695 msg =
"Could not change permissions for path `%s' " \
1696 "to %s" % (path, permissions_new)
1697 self.logger.fatal(msg)
1700 self.logger.
debug(
" Permissions ok (%s)" % permissions_new)
1709 sites_forbidden = []
1711 if (self.preferred_site ==
"CAF")
or (self.preferred_site ==
"caf.cern.ch"):
1712 self.caf_access =
True 1714 if self.caf_access ==
False:
1715 sites_forbidden.append(
"caf.cern.ch")
1726 "cmssrm-fzk.gridka.de",
1728 "gridka-dCache.fzk.de",
1729 "srm-cms.gridpp.rl.ac.uk",
1730 "srm.grid.sinica.edu.tw",
1731 "srm2.grid.sinica.edu.tw",
1733 "storm-fe-cms.cr.cnaf.infn.it" 1737 "CAF" :
"caf.cern.ch",
1738 "CH" :
"srm-cms.cern.ch",
1739 "FR" :
"ccsrm.in2p3.fr",
1740 "DE" :
"cmssrm-fzk.gridka.de",
1741 "GOV" :
"cmssrm.fnal.gov",
1742 "DE2" :
"gridka-dCache.fzk.de",
1743 "UK" :
"srm-cms.gridpp.rl.ac.uk",
1744 "TW" :
"srm.grid.sinica.edu.tw",
1745 "TW2" :
"srm2.grid.sinica.edu.tw",
1746 "ES" :
"srmcms.pic.es",
1747 "IT" :
"storm-fe-cms.cr.cnaf.infn.it" 1750 if self.non_t1access:
1751 sites_forbidden.extend(all_t1)
1753 for site
in sites_forbidden:
1757 if self.preferred_site
in country_codes:
1758 self.preferred_site = country_codes[self.preferred_site]
1760 if self.preferred_site !=
"no preference":
1761 if self.preferred_site
in sites:
1762 sites = [self.preferred_site]
1775 while len(sites) > 0
and \
1782 t1_sites.append(site)
1783 if site ==
"caf.cern.ch":
1784 t1_sites.append(site)
1791 if len(t1_sites) > 0:
1792 se_name = choice(t1_sites)
1795 se_name = choice(sites)
1799 if se_name
in self.sites_and_versions_cache
and \
1800 cmssw_version
in self.sites_and_versions_cache[se_name]:
1801 if self.sites_and_versions_cache[se_name][cmssw_version]:
1805 self.logger.
debug(
" --> rejecting site `%s'" % se_name)
1806 sites.remove(se_name)
1809 self.logger.
info(
"Checking if site `%s' " \
1810 "has CMSSW version `%s'" % \
1811 (se_name, cmssw_version))
1812 self.sites_and_versions_cache[se_name] = {}
1827 cmd =
"lcg-info --list-ce " \
1830 "CEStatus=Production," \
1832 (cmssw_version, se_name)
1833 (status, output) = subprocess.getstatusoutput(cmd)
1835 self.logger.
error(
"Could not check site information " \
1836 "for site `%s'" % se_name)
1838 if (len(output) > 0)
or (se_name ==
"caf.cern.ch"):
1839 self.sites_and_versions_cache[se_name][cmssw_version] =
True 1843 self.sites_and_versions_cache[se_name][cmssw_version] =
False 1844 self.logger.
debug(
" --> rejecting site `%s'" % se_name)
1845 sites.remove(se_name)
1847 if site_name
is self.no_matching_site_found_str:
1848 self.logger.
error(
" --> no matching site found")
1849 self.logger.
error(
" --> Your release or SCRAM " \
1850 "architecture may not be available" \
1851 "anywhere on the (LCG) grid.")
1853 self.logger.
debug(
" (command used: `%s')" % cmd)
1855 self.logger.
debug(
" --> selected site `%s'" % site_name)
1859 if site_name
is None:
1860 site_name = self.no_matching_site_found_str
1863 self.all_sites_found =
False 1875 parser = optparse.OptionParser(version=
"%s %s" % \
1876 (
"%prog", self.version),
1879 self.option_parser = parser
1882 parser.add_option(
"-d",
"--debug",
1883 help=
"Switch on debug mode",
1885 callback=self.option_handler_debug)
1888 parser.add_option(
"-q",
"--quiet",
1889 help=
"Be less verbose",
1891 callback=self.option_handler_quiet)
1895 parser.add_option(
"",
"--force",
1896 help=
"Force mode. Do not abort on sanity check " 1899 callback=self.option_handler_force)
1902 parser.add_option(
"",
"--harvesting_type",
1903 help=
"Harvesting type: %s" % \
1904 ", ".
join(self.harvesting_types),
1906 callback=self.option_handler_harvesting_type,
1908 metavar=
"HARVESTING_TYPE")
1911 parser.add_option(
"",
"--harvesting_mode",
1912 help=
"Harvesting mode: %s (default = %s)" % \
1913 (
", ".
join(self.harvesting_modes),
1914 self.harvesting_mode_default),
1916 callback=self.option_handler_harvesting_mode,
1918 metavar=
"HARVESTING_MODE")
1921 parser.add_option(
"",
"--globaltag",
1922 help=
"GlobalTag to use. Default is the ones " \
1923 "the dataset was created with for MC, for data" \
1924 "a GlobalTag has to be specified.",
1926 callback=self.option_handler_globaltag,
1928 metavar=
"GLOBALTAG")
1931 parser.add_option(
"",
"--no-ref-hists",
1932 help=
"Don't use any reference histograms",
1934 callback=self.option_handler_no_ref_hists)
1938 parser.add_option(
"",
"--frontier-connection",
1939 help=
"Use this Frontier connection to find " \
1940 "GlobalTags and LocalTags (for reference " \
1941 "histograms).\nPlease only use this for " \
1944 callback=self.option_handler_frontier_connection,
1950 parser.add_option(
"",
"--frontier-connection-for-globaltag",
1951 help=
"Use this Frontier connection to find " \
1952 "GlobalTags.\nPlease only use this for " \
1955 callback=self.option_handler_frontier_connection,
1961 parser.add_option(
"",
"--frontier-connection-for-refhists",
1962 help=
"Use this Frontier connection to find " \
1963 "LocalTags (for reference " \
1964 "histograms).\nPlease only use this for " \
1967 callback=self.option_handler_frontier_connection,
1973 parser.add_option(
"",
"--dataset",
1974 help=
"Name (or regexp) of dataset(s) to process",
1977 callback=self.option_handler_input_spec,
1984 parser.add_option(
"",
"--dataset-ignore",
1985 help=
"Name (or regexp) of dataset(s) to ignore",
1987 callback=self.option_handler_input_spec,
1989 metavar=
"DATASET-IGNORE")
1993 parser.add_option(
"",
"--runs",
1994 help=
"Run number(s) to process",
1996 callback=self.option_handler_input_spec,
2002 parser.add_option(
"",
"--runs-ignore",
2003 help=
"Run number(s) to ignore",
2005 callback=self.option_handler_input_spec,
2007 metavar=
"RUNS-IGNORE")
2011 parser.add_option(
"",
"--datasetfile",
2012 help=
"File containing list of dataset names " \
2013 "(or regexps) to process",
2016 callback=self.option_handler_input_spec,
2019 metavar=
"DATASETFILE")
2023 parser.add_option(
"",
"--datasetfile-ignore",
2024 help=
"File containing list of dataset names " \
2025 "(or regexps) to ignore",
2027 callback=self.option_handler_input_spec,
2029 metavar=
"DATASETFILE-IGNORE")
2033 parser.add_option(
"",
"--runslistfile",
2034 help=
"File containing list of run numbers " \
2037 callback=self.option_handler_input_spec,
2039 metavar=
"RUNSLISTFILE")
2043 parser.add_option(
"",
"--runslistfile-ignore",
2044 help=
"File containing list of run numbers " \
2047 callback=self.option_handler_input_spec,
2049 metavar=
"RUNSLISTFILE-IGNORE")
2053 parser.add_option(
"",
"--Jsonrunfile",
2054 help=
"Jsonfile containing dictionary of run/lumisections pairs. " \
2055 "All lumisections of runs contained in dictionary are processed.",
2057 callback=self.option_handler_input_Jsonrunfile,
2059 metavar=
"JSONRUNFILE")
2063 parser.add_option(
"",
"--Jsonfile",
2064 help=
"Jsonfile containing dictionary of run/lumisections pairs. " \
2065 "Only specified lumisections of runs contained in dictionary are processed.",
2067 callback=self.option_handler_input_Jsonfile,
2073 parser.add_option(
"",
"--todo-file",
2074 help=
"Todo file containing a list of runs to process.",
2076 callback=self.option_handler_input_todofile,
2078 metavar=
"TODO-FILE")
2082 parser.add_option(
"",
"--refhistmappingfile",
2083 help=
"File to be use for the reference " \
2084 "histogram mappings. Default: `%s'." % \
2085 self.ref_hist_mappings_file_name_default,
2087 callback=self.option_handler_ref_hist_mapping_file,
2089 metavar=
"REFHISTMAPPING-FILE")
2094 parser.add_option(
"",
"--castordir",
2095 help=
"Place on CASTOR to store results. " \
2096 "Default: `%s'." % \
2097 self.castor_base_dir_default,
2099 callback=self.option_handler_castor_dir,
2101 metavar=
"CASTORDIR")
2105 parser.add_option(
"",
"--no-t1access",
2106 help=
"Try to create jobs that will run " \
2107 "without special `t1access' role",
2109 callback=self.option_handler_no_t1access)
2112 parser.add_option(
"",
"--caf-access",
2113 help=
"Crab jobs may run " \
2116 callback=self.option_handler_caf_access)
2119 parser.add_option(
"",
"--saveByLumiSection",
2120 help=
"set saveByLumiSection=1 in harvesting cfg file",
2122 callback=self.option_handler_saveByLumiSection)
2125 parser.add_option(
"",
"--automatic-crab-submission",
2126 help=
"Crab jobs are created and " \
2127 "submitted automatically",
2129 callback=self.option_handler_crab_submission)
2133 parser.add_option(
"",
"--max-sites",
2134 help=
"Max. number of sites each job is submitted to",
2136 callback=self.option_handler_sites,
2140 parser.add_option(
"",
"--site",
2141 help=
"Crab jobs are submitted to specified site. T1 sites may be shortened by the following (country) codes: \ 2142 srm-cms.cern.ch : CH \ 2143 ccsrm.in2p3.fr : FR \ 2144 cmssrm-fzk.gridka.de : DE \ 2145 cmssrm.fnal.gov : GOV \ 2146 gridka-dCache.fzk.de : DE2 \ 2147 rm-cms.gridpp.rl.ac.uk : UK \ 2148 srm.grid.sinica.edu.tw : TW \ 2149 srm2.grid.sinica.edu.tw : TW2 \ 2150 srmcms.pic.es : ES \ 2151 storm-fe-cms.cr.cnaf.infn.it : IT",
2153 callback=self.option_handler_preferred_site,
2158 parser.add_option(
"-l",
"--list",
2159 help=
"List all harvesting types and their" \
2160 "corresponding sequence names",
2162 callback=self.option_handler_list_types)
2168 if len(self.cmd_line_opts) < 1:
2169 self.cmd_line_opts = [
"--help"]
2178 for i
in [
"-d",
"--debug",
2180 if i
in self.cmd_line_opts:
2181 self.cmd_line_opts.
remove(i)
2182 self.cmd_line_opts.
insert(0, i)
2185 parser.set_defaults()
2186 (self.options, self.args) = parser.parse_args(self.cmd_line_opts)
2193 """Check completeness and correctness of input information. 2195 Check that all required information has been specified and 2196 that, at least as far as can be easily checked, it makes 2199 NOTE: This is also where any default values are applied. 2203 self.logger.
info(
"Checking completeness/correctness of input...")
2207 if len(self.args) > 0:
2208 msg =
"Sorry but I don't understand `%s'" % \
2209 (
" ".
join(self.args))
2210 self.logger.fatal(msg)
2216 if self.harvesting_mode ==
"two-step":
2217 msg =
"--------------------\n" \
2218 " Sorry, but for the moment (well, till it works)" \
2219 " the two-step mode has been disabled.\n" \
2220 "--------------------\n" 2221 self.logger.fatal(msg)
2226 if self.harvesting_type
is None:
2227 msg =
"Please specify a harvesting type" 2228 self.logger.fatal(msg)
2231 if self.harvesting_mode
is None:
2232 self.harvesting_mode = self.harvesting_mode_default
2233 msg =
"No harvesting mode specified --> using default `%s'" % \
2234 self.harvesting_mode
2241 if self.input_method[
"datasets"][
"use"]
is None:
2242 msg =
"Please specify an input dataset name " \
2243 "or a list file name" 2244 self.logger.fatal(msg)
2249 assert not self.input_name[
"datasets"][
"use"]
is None 2256 if self.use_ref_hists:
2257 if self.ref_hist_mappings_file_name
is None:
2258 self.ref_hist_mappings_file_name = self.ref_hist_mappings_file_name_default
2259 msg =
"No reference histogram mapping file specified --> " \
2260 "using default `%s'" % \
2261 self.ref_hist_mappings_file_name
2268 if self.castor_base_dir
is None:
2269 self.castor_base_dir = self.castor_base_dir_default
2270 msg =
"No CASTOR area specified -> using default `%s'" % \
2271 self.castor_base_dir
2276 if not self.castor_base_dir.startswith(self.castor_prefix):
2277 msg =
"CASTOR area does not start with `%s'" % \
2279 self.logger.fatal(msg)
2280 if self.castor_base_dir.
find(
"castor") > -1
and \
2281 not self.castor_base_dir.
find(
"cern.ch") > -1:
2282 self.logger.fatal(
"Only CERN CASTOR is supported")
2293 if self.globaltag
is None:
2294 self.logger.
warning(
"No GlobalTag specified. This means I cannot")
2295 self.logger.
warning(
"run on data, only on MC.")
2296 self.logger.
warning(
"I will skip all data datasets.")
2301 if not self.globaltag
is None:
2302 if not self.globaltag.endswith(
"::All"):
2303 self.logger.
warning(
"Specified GlobalTag `%s' does " \
2304 "not end in `::All' --> " \
2305 "appending this missing piece" % \
2307 self.globaltag =
"%s::All" % self.globaltag
2312 for (key, value)
in self.frontier_connection_name.
items():
2313 frontier_type_str =
"unknown" 2314 if key ==
"globaltag":
2315 frontier_type_str =
"the GlobalTag" 2316 elif key ==
"refhists":
2317 frontier_type_str =
"the reference histograms" 2319 if self.frontier_connection_overridden[key] ==
True:
2323 self.logger.
info(
"Using %sdefault Frontier " \
2324 "connection for %s: `%s'" % \
2325 (non_str, frontier_type_str, value))
2334 """Check if CMSSW is setup. 2341 cmssw_version = os.getenv(
"CMSSW_VERSION")
2342 if cmssw_version
is None:
2343 self.logger.fatal(
"It seems CMSSW is not setup...")
2344 self.logger.fatal(
"($CMSSW_VERSION is empty)")
2345 raise Error(
"ERROR: CMSSW needs to be setup first!")
2347 self.cmssw_version = cmssw_version
2348 self.logger.
info(
"Found CMSSW version %s properly set up" % \
2357 """Check if DBS is setup. 2364 dbs_home = os.getenv(
"DBSCMD_HOME")
2365 if dbs_home
is None:
2366 self.logger.fatal(
"It seems DBS is not setup...")
2367 self.logger.fatal(
" $DBSCMD_HOME is empty")
2368 raise Error(
"ERROR: DBS needs to be setup first!")
2386 self.logger.
debug(
"Found DBS properly set up")
2394 """Setup the Python side of DBS. 2396 For more information see the DBS Python API documentation: 2397 https://twiki.cern.ch/twiki/bin/view/CMS/DBSApiDocumentation 2403 args[
"url"]=
"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/" \
2404 "servlet/DBSServlet" 2408 except DBSAPI.dbsApiException.DbsApiException
as ex:
2409 self.logger.fatal(
"Caught DBS API exception %s: %s " % \
2410 (ex.getClassName(), ex.getErrorMessage()))
2411 if ex.getErrorCode()
not in (
None,
""):
2412 logger.debug(
"DBS exception error code: ", ex.getErrorCode())
2420 """Use DBS to resolve a wildcarded dataset name. 2426 assert not self.dbs_api
is None 2431 if not (dataset_name.startswith(
"/")
and \
2432 dataset_name.endswith(
"RECO")):
2433 self.logger.
warning(
"Dataset name `%s' does not sound " \
2434 "like a valid dataset name!" % \
2440 dbs_query =
"find dataset where dataset like %s " \
2441 "and dataset.status = VALID" % \
2444 api_result = api.executeQuery(dbs_query)
2445 except DBSAPI.dbsApiException.DbsApiException:
2446 msg =
"ERROR: Could not execute DBS query" 2447 self.logger.fatal(msg)
2452 parser = xml.sax.make_parser()
2453 parser.setContentHandler(handler)
2457 xml.sax.parseString(api_result, handler)
2458 except SAXParseException:
2459 msg =
"ERROR: Could not parse DBS server output" 2460 self.logger.fatal(msg)
2464 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2468 datasets = handler.results.values()[0]
2476 """Ask DBS for the CMSSW version used to create this dataset. 2482 assert not self.dbs_api
is None 2486 dbs_query =
"find algo.version where dataset = %s " \
2487 "and dataset.status = VALID" % \
2490 api_result = api.executeQuery(dbs_query)
2491 except DBSAPI.dbsApiException.DbsApiException:
2492 msg =
"ERROR: Could not execute DBS query" 2493 self.logger.fatal(msg)
2497 parser = xml.sax.make_parser()
2498 parser.setContentHandler(handler)
2501 xml.sax.parseString(api_result, handler)
2502 except SAXParseException:
2503 msg =
"ERROR: Could not parse DBS server output" 2504 self.logger.fatal(msg)
2508 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2511 cmssw_version = handler.results.values()[0]
2514 assert len(cmssw_version) == 1
2517 cmssw_version = cmssw_version[0]
2520 return cmssw_version
2570 """Ask DBS for the list of runs in a given dataset. 2572 # NOTE: This does not (yet?) skip/remove empty runs. There is 2573 # a bug in the DBS entry run.numevents (i.e. it always returns 2574 # zero) which should be fixed in the `next DBS release'. 2576 # https://savannah.cern.ch/bugs/?53452 2577 # https://savannah.cern.ch/bugs/?53711 2588 assert not self.dbs_api
is None 2592 dbs_query =
"find run where dataset = %s " \
2593 "and dataset.status = VALID" % \
2596 api_result = api.executeQuery(dbs_query)
2597 except DBSAPI.dbsApiException.DbsApiException:
2598 msg =
"ERROR: Could not execute DBS query" 2599 self.logger.fatal(msg)
2603 parser = xml.sax.make_parser()
2604 parser.setContentHandler(handler)
2607 xml.sax.parseString(api_result, handler)
2608 except SAXParseException:
2609 msg =
"ERROR: Could not parse DBS server output" 2610 self.logger.fatal(msg)
2614 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2617 runs = handler.results.values()[0]
2619 runs = sorted([
int(i)
for i
in runs])
2627 """Ask DBS for the globaltag corresponding to a given dataset. 2630 # This does not seem to work for data datasets? E.g. for 2631 # /Cosmics/Commissioning08_CRAFT0831X_V1_311_ReReco_FromSuperPointing_v1/RAW-RECO 2632 # Probaly due to the fact that the GlobalTag changed during 2640 assert not self.dbs_api
is None 2644 dbs_query =
"find dataset.tag where dataset = %s " \
2645 "and dataset.status = VALID" % \
2648 api_result = api.executeQuery(dbs_query)
2649 except DBSAPI.dbsApiException.DbsApiException:
2650 msg =
"ERROR: Could not execute DBS query" 2651 self.logger.fatal(msg)
2655 parser = xml.sax.make_parser()
2656 parser.setContentHandler(parser)
2659 xml.sax.parseString(api_result, handler)
2660 except SAXParseException:
2661 msg =
"ERROR: Could not parse DBS server output" 2662 self.logger.fatal(msg)
2666 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2669 globaltag = handler.results.values()[0]
2672 assert len(globaltag) == 1
2675 globaltag = globaltag[0]
2683 """Ask DBS for the the data type (data or mc) of a given 2690 assert not self.dbs_api
is None 2694 dbs_query =
"find datatype.type where dataset = %s " \
2695 "and dataset.status = VALID" % \
2698 api_result = api.executeQuery(dbs_query)
2699 except DBSAPI.dbsApiException.DbsApiException:
2700 msg =
"ERROR: Could not execute DBS query" 2701 self.logger.fatal(msg)
2705 parser = xml.sax.make_parser()
2706 parser.setContentHandler(handler)
2709 xml.sax.parseString(api_result, handler)
2710 except SAXParseException:
2711 msg =
"ERROR: Could not parse DBS server output" 2712 self.logger.fatal(msg)
2716 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2719 datatype = handler.results.values()[0]
2722 assert len(datatype) == 1
2725 datatype = datatype[0]
2736 """Determine the number of events in a given dataset (and run). 2738 Ask DBS for the number of events in a dataset. If a run number 2739 is specified the number of events returned is that in that run 2740 of that dataset. If problems occur we throw an exception. 2743 # Since DBS does not return the number of events correctly, 2744 # neither for runs nor for whole datasets, we have to work 2745 # around that a bit... 2752 assert not self.dbs_api
is None 2756 dbs_query =
"find file.name, file.numevents where dataset = %s " \
2757 "and dataset.status = VALID" % \
2759 if not run_number
is None:
2760 dbs_query = dbq_query + (
" and run = %d" % run_number)
2762 api_result = api.executeQuery(dbs_query)
2763 except DBSAPI.dbsApiException.DbsApiException:
2764 msg =
"ERROR: Could not execute DBS query" 2765 self.logger.fatal(msg)
2769 parser = xml.sax.make_parser()
2770 parser.setContentHandler(handler)
2773 xml.sax.parseString(api_result, handler)
2774 except SAXParseException:
2775 msg =
"ERROR: Could not parse DBS server output" 2776 self.logger.fatal(msg)
2780 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2783 num_events = sum(handler.results[
"file.numevents"])
3077 """Figure out the number of events in each run of this dataset. 3079 This is a more efficient way of doing this than calling 3080 dbs_resolve_number_of_events for each run. 3084 self.logger.
debug(
"Checking spread of dataset `%s'" % dataset_name)
3088 assert not self.dbs_api
is None 3092 dbs_query =
"find run.number, site, file.name, file.numevents " \
3093 "where dataset = %s " \
3094 "and dataset.status = VALID" % \
3097 api_result = api.executeQuery(dbs_query)
3098 except DBSAPI.dbsApiException.DbsApiException:
3099 msg =
"ERROR: Could not execute DBS query" 3100 self.logger.fatal(msg)
3103 handler =
DBSXMLHandler([
"run.number",
"site",
"file.name",
"file.numevents"])
3104 parser = xml.sax.make_parser()
3105 parser.setContentHandler(handler)
3148 xml.sax.parseString(api_result, handler)
3149 except SAXParseException:
3150 msg =
"ERROR: Could not parse DBS server output" 3151 self.logger.fatal(msg)
3155 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 3162 for (index, site_name)
in enumerate(handler.results[
"site"]):
3172 if len(site_name) < 1:
3174 run_number =
int(handler.results[
"run.number"][index])
3175 file_name = handler.results[
"file.name"][index]
3176 nevents =
int(handler.results[
"file.numevents"][index])
3179 if run_number
not in files_info:
3181 files_info[run_number] = {}
3182 files_info[run_number][file_name] = (nevents,
3184 elif file_name
not in files_info[run_number]:
3186 files_info[run_number][file_name] = (nevents,
3193 assert nevents == files_info[run_number][file_name][0]
3195 files_info[run_number][file_name][1].
append(site_name)
3200 for run_number
in files_info.keys():
3201 files_without_sites = [i
for (i, j)
in \
3202 files_info[run_number].
items() \
3204 if len(files_without_sites) > 0:
3205 self.logger.
warning(
"Removing %d file(s)" \
3206 " with empty site names" % \
3207 len(files_without_sites))
3208 for file_name
in files_without_sites:
3209 del files_info[run_number][file_name]
3215 num_events_catalog = {}
3216 for run_number
in files_info.keys():
3217 site_names = list(set([j
for i
in files_info[run_number].
values()
for j
in i[1]]))
3223 if len(site_names) > 1:
3231 all_file_names = files_info[run_number].
keys()
3232 all_file_names = set(all_file_names)
3233 sites_with_complete_copies = []
3234 for site_name
in site_names:
3235 files_at_site = [i
for (i, (j, k)) \
3236 in files_info[run_number].
items() \
3238 files_at_site = set(files_at_site)
3239 if files_at_site == all_file_names:
3240 sites_with_complete_copies.append(site_name)
3241 if len(sites_with_complete_copies) < 1:
3247 if len(sites_with_complete_copies) > 1:
3266 self.logger.
debug(
" -> run appears to be `mirrored'")
3268 self.logger.
debug(
" -> run appears to be spread-out")
3271 len(sites_with_complete_copies) != len(site_names):
3275 for (file_name, (i, sites))
in files_info[run_number].
items():
3276 complete_sites = [site
for site
in sites \
3277 if site
in sites_with_complete_copies]
3278 files_info[run_number][file_name] = (i, complete_sites)
3279 site_names = sites_with_complete_copies
3281 self.logger.
debug(
" for run #%d:" % run_number)
3282 num_events_catalog[run_number] = {}
3283 num_events_catalog[run_number][
"all_sites"] = sum([i[0]
for i
in files_info[run_number].
values()])
3284 if len(site_names) < 1:
3285 self.logger.
debug(
" run is not available at any site")
3286 self.logger.
debug(
" (but should contain %d events" % \
3287 num_events_catalog[run_number][
"all_sites"])
3289 self.logger.
debug(
" at all sites combined there are %d events" % \
3290 num_events_catalog[run_number][
"all_sites"])
3291 for site_name
in site_names:
3292 num_events_catalog[run_number][site_name] = sum([i[0]
for i
in files_info[run_number].
values()
if site_name
in i[1]])
3293 self.logger.
debug(
" at site `%s' there are %d events" % \
3294 (site_name, num_events_catalog[run_number][site_name]))
3295 num_events_catalog[run_number][
"mirrored"] = mirrored
3298 return num_events_catalog
3358 """Build a list of all datasets to be processed. 3367 if input_method
is None:
3369 elif input_method ==
"dataset":
3374 self.logger.
info(
"Asking DBS for dataset names")
3375 dataset_names = self.dbs_resolve_dataset_name(input_name)
3376 elif input_method ==
"datasetfile":
3381 self.logger.
info(
"Reading input from list file `%s'" % \
3384 listfile = open(
"/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/bin/%s" %input_name,
"r") 3385 print("open listfile")
3386 for dataset
in listfile:
3388 dataset_stripped = dataset.strip()
3389 if len(dataset_stripped) < 1:
3392 if dataset_stripped[0] !=
"#":
3393 dataset_names.extend(self. \
3397 msg =
"ERROR: Could not open input list file `%s'" % \
3399 self.logger.fatal(msg)
3404 assert False,
"Unknown input method `%s'" % input_method
3412 dataset_names = sorted(set(dataset_names))
3416 return dataset_names
3421 """Build a list of datasets to process. 3425 self.logger.
info(
"Building list of datasets to consider...")
3427 input_method = self.input_method[
"datasets"][
"use"]
3428 input_name = self.input_name[
"datasets"][
"use"]
3429 dataset_names = self.build_dataset_list(input_method,
3431 self.datasets_to_use = dict(list(
zip(dataset_names,
3432 [
None] * len(dataset_names))))
3434 self.logger.
info(
" found %d dataset(s) to process:" % \
3436 for dataset
in dataset_names:
3437 self.logger.
info(
" `%s'" % dataset)
3444 """Build a list of datasets to ignore. 3446 NOTE: We should always have a list of datasets to process, but 3447 it may be that we don't have a list of datasets to ignore. 3451 self.logger.
info(
"Building list of datasets to ignore...")
3453 input_method = self.input_method[
"datasets"][
"ignore"]
3454 input_name = self.input_name[
"datasets"][
"ignore"]
3455 dataset_names = self.build_dataset_list(input_method,
3457 self.datasets_to_ignore = dict(list(
zip(dataset_names,
3458 [
None] * len(dataset_names))))
3460 self.logger.
info(
" found %d dataset(s) to ignore:" % \
3462 for dataset
in dataset_names:
3463 self.logger.
info(
" `%s'" % dataset)
3475 if input_method
is None:
3477 elif input_method ==
"runs":
3480 self.logger.
info(
"Reading list of runs from the " \
3482 runs.extend([
int(i.strip()) \
3483 for i
in input_name.split(
",") \
3484 if len(i.strip()) > 0])
3485 elif input_method ==
"runslistfile":
3487 self.logger.
info(
"Reading list of runs from file `%s'" % \
3490 listfile = open(input_name,
"r") 3491 for run
in listfile:
3493 run_stripped = run.strip()
3494 if len(run_stripped) < 1:
3497 if run_stripped[0] !=
"#":
3498 runs.append(
int(run_stripped))
3501 msg =
"ERROR: Could not open input list file `%s'" % \
3503 self.logger.fatal(msg)
3509 assert False,
"Unknown input method `%s'" % input_method
3513 runs = list(set(runs))
3521 """Build a list of runs to process. 3525 self.logger.
info(
"Building list of runs to consider...")
3527 input_method = self.input_method[
"runs"][
"use"]
3528 input_name = self.input_name[
"runs"][
"use"]
3529 runs = self.build_runs_list(input_method, input_name)
3530 self.runs_to_use = dict(list(
zip(runs, [
None] * len(runs))))
3532 self.logger.
info(
" found %d run(s) to process:" % \
3535 self.logger.
info(
" %s" %
", ".
join([
str(i)
for i
in runs]))
3542 """Build a list of runs to ignore. 3544 NOTE: We should always have a list of runs to process, but 3545 it may be that we don't have a list of runs to ignore. 3549 self.logger.
info(
"Building list of runs to ignore...")
3551 input_method = self.input_method[
"runs"][
"ignore"]
3552 input_name = self.input_name[
"runs"][
"ignore"]
3553 runs = self.build_runs_list(input_method, input_name)
3554 self.runs_to_ignore = dict(list(
zip(runs, [
None] * len(runs))))
3556 self.logger.
info(
" found %d run(s) to ignore:" % \
3559 self.logger.
info(
" %s" %
", ".
join([
str(i)
for i
in runs]))
3566 """Update the list of datasets taking into account the ones to 3569 Both lists have been generated before from DBS and both are 3570 assumed to be unique. 3572 NOTE: The advantage of creating the ignore list from DBS (in 3573 case a regexp is given) and matching that instead of directly 3574 matching the ignore criterion against the list of datasets (to 3575 consider) built from DBS is that in the former case we're sure 3576 that all regexps are treated exactly as DBS would have done 3577 without the cmsHarvester. 3579 NOTE: This only removes complete samples. Exclusion of single 3580 runs is done by the book keeping. So the assumption is that a 3581 user never wants to harvest just part (i.e. n out of N runs) 3586 self.logger.
info(
"Processing list of datasets to ignore...")
3588 self.logger.
debug(
"Before processing ignore list there are %d " \
3589 "datasets in the list to be processed" % \
3590 len(self.datasets_to_use))
3593 dataset_names_filtered = copy.deepcopy(self.datasets_to_use)
3594 for dataset_name
in self.datasets_to_use.
keys():
3595 if dataset_name
in self.datasets_to_ignore.
keys():
3596 del dataset_names_filtered[dataset_name]
3598 self.logger.
info(
" --> Removed %d dataset(s)" % \
3599 (len(self.datasets_to_use) -
3600 len(dataset_names_filtered)))
3602 self.datasets_to_use = dataset_names_filtered
3604 self.logger.
debug(
"After processing ignore list there are %d " \
3605 "datasets in the list to be processed" % \
3606 len(self.datasets_to_use))
3614 self.logger.
info(
"Processing list of runs to use and ignore...")
3625 runs_to_use = self.runs_to_use
3626 runs_to_ignore = self.runs_to_ignore
3628 for dataset_name
in self.datasets_to_use:
3629 runs_in_dataset = self.datasets_information[dataset_name][
"runs"]
3632 runs_to_use_tmp = []
3633 for run
in runs_to_use:
3634 if not run
in runs_in_dataset:
3635 self.logger.
warning(
"Dataset `%s' does not contain " \
3636 "requested run %d " \
3637 "--> ignoring `use' of this run" % \
3638 (dataset_name, run))
3640 runs_to_use_tmp.append(run)
3642 if len(runs_to_use) > 0:
3643 runs = runs_to_use_tmp
3644 self.logger.
info(
"Using %d out of %d runs " \
3645 "of dataset `%s'" % \
3646 (len(runs), len(runs_in_dataset),
3649 runs = runs_in_dataset
3651 if len(runs_to_ignore) > 0:
3654 if not run
in runs_to_ignore:
3655 runs_tmp.append(run)
3656 self.logger.
info(
"Ignoring %d out of %d runs " \
3657 "of dataset `%s'" % \
3658 (len(runs)- len(runs_tmp),
3659 len(runs_in_dataset),
3663 if self.todofile !=
"YourToDofile.txt":
3665 print(
"Reading runs from file /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s" %self.todofile)
3666 cmd=
"grep %s /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s | cut -f5 -d' '" %(dataset_name,self.todofile)
3667 (status, output)=subprocess.getstatusoutput(cmd)
3670 if run_str
in output:
3671 runs_todo.append(run)
3672 self.logger.
info(
"Using %d runs " \
3673 "of dataset `%s'" % \
3679 if self.Jsonfilename !=
"YourJSON.txt":
3681 self.Jsonlumi =
True 3684 self.logger.
info(
"Reading runs and lumisections from file `%s'" % \
3687 Jsonfile = open(self.Jsonfilename,
"r") 3688 for names
in Jsonfile:
3689 dictNames= eval(
str(names))
3690 for key
in dictNames:
3692 Json_runs.append(intkey)
3695 msg =
"ERROR: Could not open Jsonfile `%s'" % \
3697 self.logger.fatal(msg)
3700 if run
in Json_runs:
3701 good_runs.append(run)
3702 self.logger.
info(
"Using %d runs " \
3703 "of dataset `%s'" % \
3707 if (self.Jsonrunfilename !=
"YourJSON.txt")
and (self.Jsonfilename ==
"YourJSON.txt"):
3711 self.logger.
info(
"Reading runs from file `%s'" % \
3712 self.Jsonrunfilename)
3714 Jsonfile = open(self.Jsonrunfilename,
"r") 3715 for names
in Jsonfile:
3716 dictNames= eval(
str(names))
3717 for key
in dictNames:
3719 Json_runs.append(intkey)
3722 msg =
"ERROR: Could not open Jsonfile `%s'" % \
3724 self.logger.fatal(msg)
3727 if run
in Json_runs:
3728 good_runs.append(run)
3729 self.logger.
info(
"Using %d runs " \
3730 "of dataset `%s'" % \
3735 self.datasets_to_use[dataset_name] = runs
3742 """Remove all but the largest part of all datasets. 3744 This allows us to harvest at least part of these datasets 3745 using single-step harvesting until the two-step approach 3751 assert self.harvesting_mode ==
"single-step-allow-partial" 3754 for dataset_name
in self.datasets_to_use:
3755 for run_number
in self.datasets_information[dataset_name][
"runs"]:
3756 max_events =
max(self.datasets_information[dataset_name][
"sites"][run_number].
values())
3757 sites_with_max_events = [i[0]
for i
in self.datasets_information[dataset_name][
"sites"][run_number].
items()
if i[1] == max_events]
3758 self.logger.
warning(
"Singlifying dataset `%s', " \
3760 (dataset_name, run_number))
3761 cmssw_version = self.datasets_information[dataset_name] \
3763 selected_site = self.pick_a_site(sites_with_max_events,
3767 nevents_old = self.datasets_information[dataset_name][
"num_events"][run_number]
3769 "only harvesting partial statistics: " \
3770 "%d out of %d events (5.1%f%%) " \
3774 100. * max_events / nevents_old,
3776 self.logger.
warning(
"!!! Please note that the number of " \
3777 "events in the output path name will " \
3778 "NOT reflect the actual statistics in " \
3779 "the harvested results !!!")
3786 self.datasets_information[dataset_name][
"sites"][run_number] = {selected_site: max_events}
3787 self.datasets_information[dataset_name][
"num_events"][run_number] = max_events
3795 """Check list of dataset names for impossible ones. 3797 Two kinds of checks are done: 3798 - Checks for things that do not make sense. These lead to 3799 errors and skipped datasets. 3800 - Sanity checks. For these warnings are issued but the user is 3801 considered to be the authoritative expert. 3804 - The CMSSW version encoded in the dataset name should match 3805 self.cmssw_version. This is critical. 3806 - There should be some events in the dataset/run. This is 3807 critical in the sense that CRAB refuses to create jobs for 3808 zero events. And yes, this does happen in practice. E.g. the 3809 reprocessed CRAFT08 datasets contain runs with zero events. 3810 - A cursory check is performed to see if the harvesting type 3811 makes sense for the data type. This should prevent the user 3812 from inadvertently running RelVal for data. 3813 - It is not possible to run single-step harvesting jobs on 3814 samples that are not fully contained at a single site. 3815 - Each dataset/run has to be available at at least one site. 3819 self.logger.
info(
"Performing sanity checks on dataset list...")
3821 dataset_names_after_checks = copy.deepcopy(self.datasets_to_use)
3823 for dataset_name
in self.datasets_to_use.
keys():
3826 version_from_dataset = self.datasets_information[dataset_name] \
3828 if version_from_dataset != self.cmssw_version:
3829 msg =
" CMSSW version mismatch for dataset `%s' " \
3832 self.cmssw_version, version_from_dataset)
3833 if self.force_running:
3836 "--> `force mode' active: " \
3839 del dataset_names_after_checks[dataset_name]
3841 "--> skipping" % msg)
3852 datatype = self.datasets_information[dataset_name][
"datatype"]
3853 if datatype ==
"data":
3855 if self.harvesting_type !=
"DQMOffline":
3857 elif datatype ==
"mc":
3858 if self.harvesting_type ==
"DQMOffline":
3862 assert False,
"ERROR Impossible data type `%s' " \
3863 "for dataset `%s'" % \
3864 (datatype, dataset_name)
3866 msg =
" Normally one does not run `%s' harvesting " \
3867 "on %s samples, are you sure?" % \
3868 (self.harvesting_type, datatype)
3869 if self.force_running:
3871 "--> `force mode' active: " \
3874 del dataset_names_after_checks[dataset_name]
3876 "--> skipping" % msg)
3890 if datatype ==
"data":
3891 if self.globaltag
is None:
3892 msg =
"For data datasets (like `%s') " \
3893 "we need a GlobalTag" % \
3895 del dataset_names_after_checks[dataset_name]
3897 "--> skipping" % msg)
3907 globaltag = self.datasets_information[dataset_name][
"globaltag"]
3908 if not globaltag
in self.globaltag_check_cache:
3909 if self.check_globaltag(globaltag):
3910 self.globaltag_check_cache.
append(globaltag)
3912 msg =
"Something is wrong with GlobalTag `%s' " \
3913 "used by dataset `%s'!" % \
3914 (globaltag, dataset_name)
3915 if self.use_ref_hists:
3916 msg +=
"\n(Either it does not exist or it " \
3917 "does not contain the required key to " \
3918 "be used with reference histograms.)" 3920 msg +=
"\n(It probably just does not exist.)" 3921 self.logger.fatal(msg)
3927 runs_without_sites = [i
for (i, j)
in \
3928 self.datasets_information[dataset_name] \
3931 i
in self.datasets_to_use[dataset_name]]
3932 if len(runs_without_sites) > 0:
3933 for run_without_sites
in runs_without_sites:
3935 dataset_names_after_checks[dataset_name].
remove(run_without_sites)
3938 self.logger.
warning(
" removed %d unavailable run(s) " \
3939 "from dataset `%s'" % \
3940 (len(runs_without_sites), dataset_name))
3941 self.logger.
debug(
" (%s)" % \
3943 runs_without_sites]))
3949 if not self.harvesting_mode ==
"two-step":
3950 for run_number
in self.datasets_to_use[dataset_name]:
3955 num_sites = len(self.datasets_information[dataset_name] \
3956 [
"sites"][run_number])
3957 if num_sites > 1
and \
3958 not self.datasets_information[dataset_name] \
3959 [
"mirrored"][run_number]:
3963 msg =
" Dataset `%s', run %d is spread across more " \
3964 "than one site.\n" \
3965 " Cannot run single-step harvesting on " \
3966 "samples spread across multiple sites" % \
3967 (dataset_name, run_number)
3969 dataset_names_after_checks[dataset_name].
remove(run_number)
3973 "--> skipping" % msg)
3982 tmp = [j
for (i, j)
in self.datasets_information \
3983 [dataset_name][
"num_events"].
items() \
3984 if i
in self.datasets_to_use[dataset_name]]
3985 num_events_dataset = sum(tmp)
3987 if num_events_dataset < 1:
3988 msg =
" dataset `%s' is empty" % dataset_name
3989 del dataset_names_after_checks[dataset_name]
3991 "--> skipping" % msg)
4004 self.datasets_information[dataset_name] \
4005 [
"num_events"].
items()
if i[1] < 1]
4006 tmp = [i
for i
in tmp
if i[0]
in self.datasets_to_use[dataset_name]]
4007 empty_runs = dict(tmp)
4008 if len(empty_runs) > 0:
4009 for empty_run
in empty_runs:
4011 dataset_names_after_checks[dataset_name].
remove(empty_run)
4014 self.logger.
info(
" removed %d empty run(s) from dataset `%s'" % \
4015 (len(empty_runs), dataset_name))
4016 self.logger.
debug(
" (%s)" % \
4017 ", ".
join([
str(i)
for i
in empty_runs]))
4023 dataset_names_after_checks_tmp = copy.deepcopy(dataset_names_after_checks)
4024 for (dataset_name, runs)
in dataset_names_after_checks.items():
4026 self.logger.
warning(
" Removing dataset without any runs " \
4029 del dataset_names_after_checks_tmp[dataset_name]
4030 dataset_names_after_checks = dataset_names_after_checks_tmp
4034 self.logger.
warning(
" --> Removed %d dataset(s)" % \
4035 (len(self.datasets_to_use) -
4036 len(dataset_names_after_checks)))
4039 self.datasets_to_use = dataset_names_after_checks
4046 """Escape a DBS dataset name. 4048 Escape a DBS dataset name such that it does not cause trouble 4049 with the file system. This means turning each `/' into `__', 4050 except for the first one which is just removed. 4054 escaped_dataset_name = dataset_name
4055 escaped_dataset_name = escaped_dataset_name.strip(
"/")
4056 escaped_dataset_name = escaped_dataset_name.replace(
"/",
"__")
4058 return escaped_dataset_name
4065 """Generate the name of the configuration file to be run by 4068 Depending on the harvesting mode (single-step or two-step) 4069 this is the name of the real harvesting configuration or the 4070 name of the first-step ME summary extraction configuration. 4074 if self.harvesting_mode ==
"single-step":
4075 config_file_name = self.create_harvesting_config_file_name(dataset_name)
4076 elif self.harvesting_mode ==
"single-step-allow-partial":
4077 config_file_name = self.create_harvesting_config_file_name(dataset_name)
4084 elif self.harvesting_mode ==
"two-step":
4085 config_file_name = self.create_me_summary_config_file_name(dataset_name)
4087 assert False,
"ERROR Unknown harvesting mode `%s'" % \
4088 self.harvesting_mode
4091 return config_file_name
4097 "Generate the name to be used for the harvesting config file." 4099 file_name_base =
"harvesting.py" 4100 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4101 config_file_name = file_name_base.replace(
".py",
4103 dataset_name_escaped)
4106 return config_file_name
4111 "Generate the name of the ME summary extraction config file." 4113 file_name_base =
"me_extraction.py" 4114 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4115 config_file_name = file_name_base.replace(
".py",
4117 dataset_name_escaped)
4120 return config_file_name
4125 """Create the name of the output file name to be used. 4127 This is the name of the output file of the `first step'. In 4128 the case of single-step harvesting this is already the final 4129 harvesting output ROOT file. In the case of two-step 4130 harvesting it is the name of the intermediary ME summary 4143 if self.harvesting_mode ==
"single-step":
4145 assert not run_number
is None 4147 output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4148 elif self.harvesting_mode ==
"single-step-allow-partial":
4150 assert not run_number
is None 4152 output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4153 elif self.harvesting_mode ==
"two-step":
4155 assert run_number
is None 4157 output_file_name = self.create_me_summary_output_file_name(dataset_name)
4160 assert False,
"ERROR Unknown harvesting mode `%s'" % \
4161 self.harvesting_mode
4164 return output_file_name
4169 """Generate the name to be used for the harvesting output file. 4171 This harvesting output file is the _final_ ROOT output file 4172 containing the harvesting results. In case of two-step 4173 harvesting there is an intermediate ME output file as well. 4177 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4185 output_file_name =
"DQM_V0001_R%09d__%s.root" % \
4186 (run_number, dataset_name_escaped)
4187 if self.harvesting_mode.
find(
"partial") > -1:
4190 if self.datasets_information[dataset_name] \
4191 [
"mirrored"][run_number] ==
False:
4192 output_file_name = output_file_name.replace(
".root", \
4196 return output_file_name
4201 """Generate the name of the intermediate ME file name to be 4202 used in two-step harvesting. 4206 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4207 output_file_name =
"me_summary_%s.root" % \
4208 dataset_name_escaped
4211 return output_file_name
4216 """Create the block name to use for this dataset/run number. 4218 This is what appears in the brackets `[]' in multicrab.cfg. It 4219 is used as the name of the job and to create output 4224 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4225 block_name =
"%s_%09d_%s" % (dataset_name_escaped, run_number, index)
4233 """Create a CRAB configuration for a given job. 4235 NOTE: This is _not_ a complete (as in: submittable) CRAB 4236 configuration. It is used to store the common settings for the 4237 multicrab configuration. 4239 NOTE: Only CERN CASTOR area (/castor/cern.ch/) is supported. 4241 NOTE: According to CRAB, you `Must define exactly two of 4242 total_number_of_events, events_per_job, or 4243 number_of_jobs.'. For single-step harvesting we force one job, 4244 for the rest we don't really care. 4247 # With the current version of CRAB (2.6.1), in which Daniele 4248 # fixed the behaviour of no_block_boundary for me, one _has to 4249 # specify_ the total_number_of_events and one single site in 4250 # the se_white_list. 4258 castor_prefix = self.castor_prefix
4260 tmp.append(self.config_file_header())
4265 tmp.append(
"[CRAB]")
4266 tmp.append(
"jobtype = cmssw")
4271 tmp.append(
"[GRID]")
4272 tmp.append(
"virtual_organization=cms")
4277 tmp.append(
"[USER]")
4278 tmp.append(
"copy_data = 1")
4283 tmp.append(
"[CMSSW]")
4284 tmp.append(
"# This reveals data hosted on T1 sites,")
4285 tmp.append(
"# which is normally hidden by CRAB.")
4286 tmp.append(
"show_prod = 1")
4287 tmp.append(
"number_of_jobs = 1")
4288 if self.Jsonlumi ==
True:
4289 tmp.append(
"lumi_mask = %s" % self.Jsonfilename)
4290 tmp.append(
"total_number_of_lumis = -1")
4292 if self.harvesting_type ==
"DQMOffline":
4293 tmp.append(
"total_number_of_lumis = -1")
4295 tmp.append(
"total_number_of_events = -1")
4296 if self.harvesting_mode.
find(
"single-step") > -1:
4297 tmp.append(
"# Force everything to run in one job.")
4298 tmp.append(
"no_block_boundary = 1")
4305 crab_config =
"\n".
join(tmp)
4313 """Create a multicrab.cfg file for all samples. 4315 This creates the contents for a multicrab.cfg file that uses 4316 the crab.cfg file (generated elsewhere) for the basic settings 4317 and contains blocks for each run of each dataset. 4320 # The fact that it's necessary to specify the se_white_list 4321 # and the total_number_of_events is due to our use of CRAB 4322 # version 2.6.1. This should no longer be necessary in the 4328 cmd=
"who i am | cut -f1 -d' '" 4329 (status, output)=subprocess.getstatusoutput(cmd)
4332 if self.caf_access ==
True:
4333 print(
"Extracting %s as user name" %UserName)
4335 number_max_sites = self.nr_max_sites + 1
4337 multicrab_config_lines = []
4338 multicrab_config_lines.append(self.config_file_header())
4339 multicrab_config_lines.append(
"")
4340 multicrab_config_lines.append(
"[MULTICRAB]")
4341 multicrab_config_lines.append(
"cfg = crab.cfg")
4342 multicrab_config_lines.append(
"")
4344 dataset_names = sorted(self.datasets_to_use.
keys())
4346 for dataset_name
in dataset_names:
4347 runs = self.datasets_to_use[dataset_name]
4348 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4349 castor_prefix = self.castor_prefix
4354 castor_dir = self.datasets_information[dataset_name] \
4355 [
"castor_path"][run]
4357 cmd =
"rfdir %s" % castor_dir
4358 (status, output) = subprocess.getstatusoutput(cmd)
4360 if len(output) <= 0:
4366 assert (len(self.datasets_information[dataset_name] \
4367 [
"sites"][run]) == 1)
or \
4368 self.datasets_information[dataset_name][
"mirrored"]
4371 site_names = self.datasets_information[dataset_name] \
4372 [
"sites"][run].
keys()
4374 for i
in range(1, number_max_sites, 1):
4375 if len(site_names) > 0:
4376 index =
"site_%02d" % (i)
4378 config_file_name = self. \
4380 output_file_name = self. \
4391 if len(site_names) > 1:
4392 cmssw_version = self.datasets_information[dataset_name] \
4394 self.logger.
info(
"Picking site for mirrored dataset " \
4396 (dataset_name, run))
4397 site_name = self.pick_a_site(site_names, cmssw_version)
4398 if site_name
in site_names:
4399 site_names.remove(site_name)
4402 site_name = site_names[0]
4403 site_names.remove(site_name)
4405 if site_name
is self.no_matching_site_found_str:
4409 nevents = self.datasets_information[dataset_name][
"num_events"][run]
4412 multicrab_block_name = self.create_multicrab_block_name( \
4413 dataset_name, run, index)
4414 multicrab_config_lines.append(
"[%s]" % \
4415 multicrab_block_name)
4419 if site_name ==
"caf.cern.ch":
4420 multicrab_config_lines.append(
"CRAB.use_server=0")
4421 multicrab_config_lines.append(
"CRAB.scheduler=caf")
4423 multicrab_config_lines.append(
"scheduler = glite")
4427 if site_name ==
"caf.cern.ch":
4430 multicrab_config_lines.append(
"GRID.se_white_list = %s" % \
4432 multicrab_config_lines.append(
"# This removes the default blacklisting of T1 sites.")
4433 multicrab_config_lines.append(
"GRID.remove_default_blacklist = 1")
4434 multicrab_config_lines.append(
"GRID.rb = CERN")
4435 if not self.non_t1access:
4436 multicrab_config_lines.append(
"GRID.role = t1access")
4441 castor_dir = castor_dir.replace(castor_prefix,
"")
4442 multicrab_config_lines.append(
"USER.storage_element=srm-cms.cern.ch")
4443 multicrab_config_lines.append(
"USER.user_remote_dir = %s" % \
4445 multicrab_config_lines.append(
"USER.check_user_remote_dir=0")
4447 if site_name ==
"caf.cern.ch":
4448 multicrab_config_lines.append(
"USER.storage_path=%s" % castor_prefix)
4454 multicrab_config_lines.append(
"USER.storage_path=/srm/managerv2?SFN=%s" % castor_prefix)
4461 multicrab_config_lines.append(
"CMSSW.pset = %s" % \
4463 multicrab_config_lines.append(
"CMSSW.datasetpath = %s" % \
4465 multicrab_config_lines.append(
"CMSSW.runselection = %d" % \
4468 if self.Jsonlumi ==
True:
4471 if self.harvesting_type ==
"DQMOffline":
4474 multicrab_config_lines.append(
"CMSSW.total_number_of_events = %d" % \
4477 multicrab_config_lines.append(
"CMSSW.output_file = %s" % \
4482 if site_name ==
"caf.cern.ch":
4483 multicrab_config_lines.append(
"CAF.queue=cmscaf1nd")
4487 multicrab_config_lines.append(
"")
4491 self.all_sites_found =
True 4493 multicrab_config =
"\n".
join(multicrab_config_lines)
4496 return multicrab_config
4501 """Check if globaltag exists. 4503 Check if globaltag exists as GlobalTag in the database given 4504 by self.frontier_connection_name['globaltag']. If globaltag is 4505 None, self.globaltag is used instead. 4507 If we're going to use reference histograms this method also 4508 checks for the existence of the required key in the GlobalTag. 4512 if globaltag
is None:
4513 globaltag = self.globaltag
4516 if globaltag.endswith(
"::All"):
4517 globaltag = globaltag[:-5]
4519 connect_name = self.frontier_connection_name[
"globaltag"]
4527 connect_name = connect_name.replace(
"frontier://",
4528 "frontier://cmsfrontier:8000/")
4530 connect_name += self.db_account_name_cms_cond_globaltag()
4532 tag_exists = self.check_globaltag_exists(globaltag, connect_name)
4536 tag_contains_ref_hist_key =
False 4537 if self.use_ref_hists
and tag_exists:
4539 tag_contains_ref_hist_key = self.check_globaltag_contains_ref_hist_key(globaltag, connect_name)
4543 if self.use_ref_hists:
4544 ret_val = tag_exists
and tag_contains_ref_hist_key
4546 ret_val = tag_exists
4556 """Check if globaltag exists. 4560 self.logger.
info(
"Checking existence of GlobalTag `%s'" % \
4562 self.logger.
debug(
" (Using database connection `%s')" % \
4565 cmd =
"cmscond_tagtree_list -c %s -T %s" % \
4566 (connect_name, globaltag)
4567 (status, output) = subprocess.getstatusoutput(cmd)
4569 output.find(
"error") > -1:
4570 msg =
"Could not check existence of GlobalTag `%s' in `%s'" % \
4571 (globaltag, connect_name)
4572 if output.find(
".ALL_TABLES not found") > -1:
4574 "Missing database account `%s'" % \
4575 (msg, output.split(
".ALL_TABLES")[0].
split()[-1])
4576 self.logger.fatal(msg)
4577 self.logger.
debug(
"Command used:")
4578 self.logger.
debug(
" %s" % cmd)
4579 self.logger.
debug(
"Output received:")
4580 self.logger.
debug(output)
4582 if output.find(
"does not exist") > -1:
4583 self.logger.
debug(
"GlobalTag `%s' does not exist in `%s':" % \
4584 (globaltag, connect_name))
4585 self.logger.
debug(
"Output received:")
4586 self.logger.
debug(output)
4590 self.logger.
info(
" GlobalTag exists? -> %s" % tag_exists)
4598 """Check if globaltag contains the required RefHistos key. 4603 tag_contains_key =
None 4604 ref_hist_key =
"RefHistos" 4605 self.logger.
info(
"Checking existence of reference " \
4606 "histogram key `%s' in GlobalTag `%s'" % \
4607 (ref_hist_key, globaltag))
4608 self.logger.
debug(
" (Using database connection `%s')" % \
4610 cmd =
"cmscond_tagtree_list -c %s -T %s -n %s" % \
4611 (connect_name, globaltag, ref_hist_key)
4612 (status, output) = subprocess.getstatusoutput(cmd)
4614 output.find(
"error") > -1:
4615 msg =
"Could not check existence of key `%s'" % \
4616 (ref_hist_key, connect_name)
4617 self.logger.fatal(msg)
4618 self.logger.
debug(
"Command used:")
4619 self.logger.
debug(
" %s" % cmd)
4620 self.logger.
debug(
"Output received:")
4621 self.logger.
debug(
" %s" % output)
4624 self.logger.
debug(
"Required key for use of reference " \
4625 "histograms `%s' does not exist " \
4626 "in GlobalTag `%s':" % \
4627 (ref_hist_key, globaltag))
4628 self.logger.
debug(
"Output received:")
4629 self.logger.
debug(output)
4630 tag_contains_key =
False 4632 tag_contains_key =
True 4634 self.logger.
info(
" GlobalTag contains `%s' key? -> %s" % \
4635 (ref_hist_key, tag_contains_key))
4638 return tag_contains_key
4643 """Check the existence of tag_name in database connect_name. 4645 Check if tag_name exists as a reference histogram tag in the 4646 database given by self.frontier_connection_name['refhists']. 4650 connect_name = self.frontier_connection_name[
"refhists"]
4651 connect_name += self.db_account_name_cms_cond_dqm_summary()
4653 self.logger.
debug(
"Checking existence of reference " \
4654 "histogram tag `%s'" % \
4656 self.logger.
debug(
" (Using database connection `%s')" % \
4659 cmd =
"cmscond_list_iov -c %s" % \
4661 (status, output) = subprocess.getstatusoutput(cmd)
4663 msg =
"Could not check existence of tag `%s' in `%s'" % \
4664 (tag_name, connect_name)
4665 self.logger.fatal(msg)
4666 self.logger.
debug(
"Command used:")
4667 self.logger.
debug(
" %s" % cmd)
4668 self.logger.
debug(
"Output received:")
4669 self.logger.
debug(output)
4671 if not tag_name
in output.split():
4672 self.logger.
debug(
"Reference histogram tag `%s' " \
4673 "does not exist in `%s'" % \
4674 (tag_name, connect_name))
4675 self.logger.
debug(
" Existing tags: `%s'" % \
4676 "', `".
join(output.split()))
4680 self.logger.
debug(
" Reference histogram tag exists? " \
4681 "-> %s" % tag_exists)
4689 """Create the Python harvesting configuration for harvesting. 4691 The basic configuration is created by 4692 Configuration.PyReleaseValidation.ConfigBuilder. (This mimics 4693 what cmsDriver.py does.) After that we add some specials 4696 NOTE: On one hand it may not be nice to circumvent 4697 cmsDriver.py, on the other hand cmsDriver.py does not really 4698 do anything itself. All the real work is done by the 4699 ConfigBuilder so there is not much risk that we miss out on 4700 essential developments of cmsDriver in the future. 4705 config_options = defaultOptions
4710 config_options.name =
"harvesting" 4711 config_options.scenario =
"pp" 4712 config_options.number = 1
4713 config_options.arguments = self.ident_string()
4714 config_options.evt_type = config_options.name
4715 config_options.customisation_file =
None 4716 config_options.filein =
"dummy_value" 4717 config_options.filetype =
"EDM" 4719 config_options.gflash =
"dummy_value" 4723 config_options.dbsquery =
"" 4730 config_options.step =
"HARVESTING:%s" % \
4731 self.harvesting_info[self.harvesting_type] \
4733 config_options.beamspot = self.harvesting_info[self.harvesting_type] \
4735 config_options.eventcontent = self.harvesting_info \
4736 [self.harvesting_type] \
4738 config_options.harvesting = self.harvesting_info \
4739 [self.harvesting_type] \
4746 datatype = self.datasets_information[dataset_name][
"datatype"]
4747 config_options.isMC = (datatype.lower() ==
"mc")
4748 config_options.isData = (datatype.lower() ==
"data")
4749 globaltag = self.datasets_information[dataset_name][
"globaltag"]
4751 config_options.conditions = self.format_conditions_string(globaltag)
4755 if "with_input" in getargspec(ConfigBuilder.__init__)[0]:
4757 config_builder =
ConfigBuilder(config_options, with_input=
True)
4761 config_builder.prepare(
True)
4762 config_contents = config_builder.pythonCfgCode
4771 marker_lines.append(sep)
4772 marker_lines.append(
"# Code between these markers was generated by")
4773 marker_lines.append(
"# Configuration.PyReleaseValidation." \
4776 marker_lines.append(sep)
4777 marker =
"\n".
join(marker_lines)
4779 tmp = [self.config_file_header()]
4783 tmp.append(config_contents)
4787 config_contents =
"\n".
join(tmp)
4792 customisations = [
""]
4794 customisations.append(
"# Now follow some customisations")
4795 customisations.append(
"")
4796 connect_name = self.frontier_connection_name[
"globaltag"]
4797 connect_name += self.db_account_name_cms_cond_globaltag()
4798 customisations.append(
"process.GlobalTag.connect = \"%s\"" % \
4802 if self.saveByLumiSection ==
True:
4803 customisations.append(
"process.dqmSaver.saveByLumiSection = 1")
4807 customisations.append(
"")
4821 use_es_prefer = (self.harvesting_type ==
"RelVal")
4822 use_refs = use_es_prefer
or \
4823 (
not self.harvesting_type ==
"MC")
4825 use_refs = use_refs
and self.use_ref_hists
4833 customisations.append(
"print \"Not using reference histograms\"")
4834 customisations.append(
"if hasattr(process, \"dqmRefHistoRootFileGetter\"):")
4835 customisations.append(
" for (sequence_name, sequence) in process.sequences.items():")
4836 customisations.append(
" if sequence.remove(process.dqmRefHistoRootFileGetter):")
4837 customisations.append(
" print \"Removed process.dqmRefHistoRootFileGetter from sequence `%s'\" % \\")
4838 customisations.append(
" sequence_name")
4839 customisations.append(
"process.dqmSaver.referenceHandling = \"skip\"")
4843 customisations.append(
"process.dqmSaver.referenceHandling = \"all\"")
4845 es_prefer_snippet = self.create_es_prefer_snippet(dataset_name)
4846 customisations.append(es_prefer_snippet)
4850 workflow_name = dataset_name
4851 if self.harvesting_mode ==
"single-step-allow-partial":
4852 workflow_name +=
"_partial" 4853 customisations.append(
"process.dqmSaver.workflow = \"%s\"" % \
4890 config_contents = config_contents +
"\n".
join(customisations)
4895 return config_contents
4922 tmp.append(self.config_file_header())
4924 tmp.append(
"import FWCore.ParameterSet.Config as cms")
4926 tmp.append(
"process = cms.Process(\"ME2EDM\")")
4928 tmp.append(
"# Import of standard configurations")
4929 tmp.append(
"process.load(\"Configuration/EventContent/EventContent_cff\")")
4931 tmp.append(
"# We don't really process any events, just keep this set to one to")
4932 tmp.append(
"# make sure things work.")
4933 tmp.append(
"process.maxEvents = cms.untracked.PSet(")
4934 tmp.append(
" input = cms.untracked.int32(1)")
4937 tmp.append(
"process.options = cms.untracked.PSet(")
4938 tmp.append(
" Rethrow = cms.untracked.vstring(\"ProductNotFound\")")
4941 tmp.append(
"process.source = cms.Source(\"PoolSource\",")
4942 tmp.append(
" processingMode = \\")
4943 tmp.append(
" cms.untracked.string(\"RunsAndLumis\"),")
4944 tmp.append(
" fileNames = \\")
4945 tmp.append(
" cms.untracked.vstring(\"no_file_specified\")")
4948 tmp.append(
"# Output definition: drop everything except for the monitoring.")
4949 tmp.append(
"process.output = cms.OutputModule(")
4950 tmp.append(
" \"PoolOutputModule\",")
4951 tmp.append(
" outputCommands = \\")
4952 tmp.append(
" cms.untracked.vstring(\"drop *\", \\")
4953 tmp.append(
" \"keep *_MEtoEDMConverter_*_*\"),")
4954 output_file_name = self. \
4956 tmp.append(
" fileName = \\")
4957 tmp.append(
" cms.untracked.string(\"%s\")," % output_file_name)
4958 tmp.append(
" dataset = cms.untracked.PSet(")
4959 tmp.append(
" dataTier = cms.untracked.string(\"RECO\"),")
4960 tmp.append(
" filterName = cms.untracked.string(\"\")")
4964 tmp.append(
"# Additional output definition")
4965 tmp.append(
"process.out_step = cms.EndPath(process.output)")
4967 tmp.append(
"# Schedule definition")
4968 tmp.append(
"process.schedule = cms.Schedule(process.out_step)")
4971 config_contents =
"\n".
join(tmp)
4974 return config_contents
5012 """Write a CRAB job configuration Python file. 5016 self.logger.
info(
"Writing CRAB configuration...")
5018 file_name_base =
"crab.cfg" 5021 crab_contents = self.create_crab_config()
5024 crab_file_name = file_name_base
5026 crab_file =
file(crab_file_name,
"w")
5027 crab_file.write(crab_contents)
5030 self.logger.fatal(
"Could not write " \
5031 "CRAB configuration to file `%s'" % \
5033 raise Error(
"ERROR: Could not write to file `%s'!" % \
5041 """Write a multi-CRAB job configuration Python file. 5045 self.logger.
info(
"Writing multi-CRAB configuration...")
5047 file_name_base =
"multicrab.cfg" 5050 multicrab_contents = self.create_multicrab_config()
5053 multicrab_file_name = file_name_base
5055 multicrab_file =
file(multicrab_file_name,
"w")
5056 multicrab_file.write(multicrab_contents)
5057 multicrab_file.close()
5059 self.logger.fatal(
"Could not write " \
5060 "multi-CRAB configuration to file `%s'" % \
5061 multicrab_file_name)
5062 raise Error(
"ERROR: Could not write to file `%s'!" % \
5063 multicrab_file_name)
5070 """Write a harvesting job configuration Python file. 5072 NOTE: This knows nothing about single-step or two-step 5073 harvesting. That's all taken care of by 5074 create_harvesting_config. 5078 self.logger.
debug(
"Writing harvesting configuration for `%s'..." % \
5082 config_contents = self.create_harvesting_config(dataset_name)
5085 config_file_name = self. \
5088 config_file =
file(config_file_name,
"w")
5089 config_file.write(config_contents)
5092 self.logger.fatal(
"Could not write " \
5093 "harvesting configuration to file `%s'" % \
5095 raise Error(
"ERROR: Could not write to file `%s'!" % \
5103 """Write an ME-extraction configuration Python file. 5105 This `ME-extraction' (ME = Monitoring Element) is the first 5106 step of the two-step harvesting. 5110 self.logger.
debug(
"Writing ME-extraction configuration for `%s'..." % \
5114 config_contents = self.create_me_extraction_config(dataset_name)
5117 config_file_name = self. \
5120 config_file =
file(config_file_name,
"w")
5121 config_file.write(config_contents)
5124 self.logger.fatal(
"Could not write " \
5125 "ME-extraction configuration to file `%s'" % \
5127 raise Error(
"ERROR: Could not write to file `%s'!" % \
5136 """Check if we need to load and check the reference mappings. 5138 For data the reference histograms should be taken 5139 automatically from the GlobalTag, so we don't need any 5140 mappings. For RelVals we need to know a mapping to be used in 5141 the es_prefer code snippet (different references for each of 5144 WARNING: This implementation is a bit convoluted. 5150 if not dataset_name
is None:
5151 data_type = self.datasets_information[dataset_name] \
5153 mappings_needed = (data_type ==
"mc")
5155 if not mappings_needed:
5156 assert data_type ==
"data" 5159 tmp = [self.ref_hist_mappings_needed(dataset_name) \
5160 for dataset_name
in \
5161 self.datasets_information.
keys()]
5162 mappings_needed = (
True in tmp)
5165 return mappings_needed
5170 """Load the reference histogram mappings from file. 5172 The dataset name to reference histogram name mappings are read 5173 from a text file specified in self.ref_hist_mappings_file_name. 5178 assert len(self.ref_hist_mappings) < 1, \
5179 "ERROR Should not be RE-loading " \
5180 "reference histogram mappings!" 5183 self.logger.
info(
"Loading reference histogram mappings " \
5184 "from file `%s'" % \
5185 self.ref_hist_mappings_file_name)
5187 mappings_lines =
None 5189 mappings_file =
file(self.ref_hist_mappings_file_name,
"r") 5190 mappings_lines = mappings_file.readlines() 5191 mappings_file.close() 5193 msg =
"ERROR: Could not open reference histogram mapping "\
5194 "file `%s'" % self.ref_hist_mappings_file_name
5195 self.logger.fatal(msg)
5205 for mapping
in mappings_lines:
5207 if not mapping.startswith(
"#"):
5208 mapping = mapping.strip()
5209 if len(mapping) > 0:
5210 mapping_pieces = mapping.split()
5211 if len(mapping_pieces) != 2:
5212 msg =
"ERROR: The reference histogram mapping " \
5213 "file contains a line I don't " \
5214 "understand:\n %s" % mapping
5215 self.logger.fatal(msg)
5217 dataset_name = mapping_pieces[0].
strip()
5218 ref_hist_name = mapping_pieces[1].
strip()
5222 if dataset_name
in self.ref_hist_mappings:
5223 msg =
"ERROR: The reference histogram mapping " \
5224 "file contains multiple mappings for " \
5226 self.logger.fatal(msg)
5230 self.ref_hist_mappings[dataset_name] = ref_hist_name
5234 self.logger.
info(
" Successfully loaded %d mapping(s)" % \
5235 len(self.ref_hist_mappings))
5236 max_len =
max([len(i)
for i
in self.ref_hist_mappings.
keys()])
5237 for (map_from, map_to)
in self.ref_hist_mappings.
items():
5238 self.logger.
info(
" %-*s -> %s" % \
5239 (max_len, map_from, map_to))
5246 """Make sure all necessary reference histograms exist. 5248 Check that for each of the datasets to be processed a 5249 reference histogram is specified and that that histogram 5250 exists in the database. 5252 NOTE: There's a little complication here. Since this whole 5253 thing was designed to allow (in principle) harvesting of both 5254 data and MC datasets in one go, we need to be careful to check 5255 the availability fof reference mappings only for those 5256 datasets that need it. 5260 self.logger.
info(
"Checking reference histogram mappings")
5262 for dataset_name
in self.datasets_to_use:
5264 ref_hist_name = self.ref_hist_mappings[dataset_name]
5266 msg =
"ERROR: No reference histogram mapping found " \
5267 "for dataset `%s'" % \
5269 self.logger.fatal(msg)
5272 if not self.check_ref_hist_tag(ref_hist_name):
5273 msg =
"Reference histogram tag `%s' " \
5274 "(used for dataset `%s') does not exist!" % \
5275 (ref_hist_name, dataset_name)
5276 self.logger.fatal(msg)
5279 self.logger.
info(
" Done checking reference histogram mappings.")
5286 """Obtain all information on the datasets that we need to run. 5288 Use DBS to figure out all required information on our 5289 datasets, like the run numbers and the GlobalTag. All 5290 information is stored in the datasets_information member 5305 self.datasets_information = {}
5306 self.logger.
info(
"Collecting information for all datasets to process")
5307 dataset_names = sorted(self.datasets_to_use.
keys())
5308 for dataset_name
in dataset_names:
5312 self.logger.
info(sep_line)
5313 self.logger.
info(
" `%s'" % dataset_name)
5314 self.logger.
info(sep_line)
5316 runs = self.dbs_resolve_runs(dataset_name)
5317 self.logger.
info(
" found %d run(s)" % len(runs))
5319 self.logger.
debug(
" run number(s): %s" % \
5320 ", ".
join([
str(i)
for i
in runs]))
5324 self.logger.
warning(
" --> skipping dataset " 5326 assert False,
"Panic: found a dataset without runs " \
5330 cmssw_version = self.dbs_resolve_cmssw_version(dataset_name)
5331 self.logger.
info(
" found CMSSW version `%s'" % cmssw_version)
5334 datatype = self.dbs_resolve_datatype(dataset_name)
5335 self.logger.
info(
" sample is data or MC? --> %s" % \
5341 if self.globaltag
is None:
5342 globaltag = self.dbs_resolve_globaltag(dataset_name)
5344 globaltag = self.globaltag
5346 self.logger.
info(
" found GlobalTag `%s'" % globaltag)
5352 assert datatype ==
"data", \
5353 "ERROR Empty GlobalTag for MC dataset!!!" 5361 sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5365 for run_number
in sites_catalog.keys():
5366 num_events[run_number] = sites_catalog \
5367 [run_number][
"all_sites"]
5368 del sites_catalog[run_number][
"all_sites"]
5373 for run_number
in sites_catalog.keys():
5374 mirror_catalog[run_number] = sites_catalog \
5375 [run_number][
"mirrored"]
5376 del sites_catalog[run_number][
"mirrored"]
5405 self.datasets_information[dataset_name] = {}
5406 self.datasets_information[dataset_name][
"runs"] = runs
5407 self.datasets_information[dataset_name][
"cmssw_version"] = \
5409 self.datasets_information[dataset_name][
"globaltag"] = globaltag
5410 self.datasets_information[dataset_name][
"datatype"] = datatype
5411 self.datasets_information[dataset_name][
"num_events"] = num_events
5412 self.datasets_information[dataset_name][
"mirrored"] = mirror_catalog
5413 self.datasets_information[dataset_name][
"sites"] = sites_catalog
5417 castor_path_common = self.create_castor_path_name_common(dataset_name)
5418 self.logger.
info(
" output will go into `%s'" % \
5421 castor_paths = dict(list(
zip(runs,
5422 [self.create_castor_path_name_special(dataset_name, i, castor_path_common) \
5424 for path_name
in castor_paths.values():
5425 self.logger.
debug(
" %s" % path_name)
5426 self.datasets_information[dataset_name][
"castor_path"] = \
5434 """Tell the user what to do now, after this part is done. 5436 This should provide the user with some (preferably 5437 copy-pasteable) instructions on what to do now with the setups 5438 and files that have been created. 5448 self.logger.
info(
"")
5449 self.logger.
info(sep_line)
5450 self.logger.
info(
" Configuration files have been created.")
5451 self.logger.
info(
" From here on please follow the usual CRAB instructions.")
5452 self.logger.
info(
" Quick copy-paste instructions are shown below.")
5453 self.logger.
info(sep_line)
5455 self.logger.
info(
"")
5456 self.logger.
info(
" Create all CRAB jobs:")
5457 self.logger.
info(
" multicrab -create")
5458 self.logger.
info(
"")
5459 self.logger.
info(
" Submit all CRAB jobs:")
5460 self.logger.
info(
" multicrab -submit")
5461 self.logger.
info(
"")
5462 self.logger.
info(
" Check CRAB status:")
5463 self.logger.
info(
" multicrab -status")
5464 self.logger.
info(
"")
5466 self.logger.
info(
"")
5467 self.logger.
info(
" For more information please see the CMS Twiki:")
5468 self.logger.
info(
" %s" % twiki_url)
5469 self.logger.
info(sep_line)
5473 if not self.all_sites_found:
5474 self.logger.
warning(
" For some of the jobs no matching " \
5475 "site could be found")
5476 self.logger.
warning(
" --> please scan your multicrab.cfg" \
5477 "for occurrences of `%s'." % \
5478 self.no_matching_site_found_str)
5479 self.logger.
warning(
" You will have to fix those " \
5487 "Main entry point of the CMS harvester." 5497 self.parse_cmd_line_options()
5499 self.check_input_status()
5512 self.setup_harvesting_info()
5515 self.build_dataset_use_list()
5517 self.build_dataset_ignore_list()
5520 self.build_runs_use_list()
5521 self.build_runs_ignore_list()
5528 self.process_dataset_ignore_list()
5532 self.build_datasets_information()
5534 if self.use_ref_hists
and \
5535 self.ref_hist_mappings_needed():
5538 self.load_ref_hist_mappings()
5542 self.check_ref_hist_mappings()
5544 self.logger.
info(
"No need to load reference " \
5545 "histogram mappings file")
5560 self.process_runs_use_and_ignore_lists()
5565 if self.harvesting_mode ==
"single-step-allow-partial":
5566 self.singlify_datasets()
5569 self.check_dataset_list()
5571 if len(self.datasets_to_use) < 1:
5572 self.logger.
info(
"After all checks etc. " \
5573 "there are no datasets (left?) " \
5577 self.logger.
info(
"After all checks etc. we are left " \
5578 "with %d dataset(s) to process " \
5579 "for a total of %d runs" % \
5580 (len(self.datasets_to_use),
5581 sum([len(i)
for i
in \
5582 self.datasets_to_use.
values()])))
5609 self.create_and_check_castor_dirs()
5613 self.write_crab_config()
5614 self.write_multicrab_config()
5624 for dataset_name
in self.datasets_to_use.
keys():
5626 self.write_harvesting_config(dataset_name)
5627 if self.harvesting_mode ==
"two-step":
5628 self.write_me_extraction_config(dataset_name)
5636 for run_number
in self.datasets_to_use[dataset_name]:
5637 tmp[run_number] = self.datasets_information \
5638 [dataset_name][
"num_events"][run_number]
5639 if dataset_name
in self.book_keeping_information:
5640 self.book_keeping_information[dataset_name].
update(tmp)
5642 self.book_keeping_information[dataset_name] = tmp
5645 self.show_exit_message()
5647 except Usage
as err:
5652 except Error
as err:
5656 except Exception
as err:
5665 if isinstance(err, SystemExit):
5666 self.logger.fatal(err.code)
5667 elif not isinstance(err, KeyboardInterrupt):
5668 self.logger.fatal(
"!" * 50)
5669 self.logger.fatal(
" This looks like a serious problem.")
5670 self.logger.fatal(
" If you are sure you followed all " \
5672 self.logger.fatal(
" please copy the below stack trace together")
5673 self.logger.fatal(
" with a description of what you were doing to")
5674 self.logger.fatal(
" jeroen.hegeman@cern.ch.")
5675 self.logger.fatal(
" %s" % self.ident_string())
5676 self.logger.fatal(
"!" * 50)
5677 self.logger.fatal(
str(err))
5679 traceback_string = traceback.format_exc()
5680 for line
in traceback_string.split(
"\n"):
5681 self.logger.fatal(line)
5682 self.logger.fatal(
"!" * 50)
5697 if self.crab_submission ==
True:
5698 os.system(
"multicrab -create")
5699 os.system(
"multicrab -submit")
5710 if __name__ ==
"__main__":
5711 "Main entry point for harvesting." def create_crab_config(self)
def option_handler_harvesting_mode(self, option, opt_str, value, parser)
def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser)
def write_harvesting_config(self, dataset_name)
no_matching_site_found_str
def option_handler_saveByLumiSection(self, option, opt_str, value, parser)
def option_handler_sites(self, option, opt_str, value, parser)
def write_me_extraction_config(self, dataset_name)
def process_runs_use_and_ignore_lists(self)
def build_dataset_ignore_list(self)
def escape_dataset_name(self, dataset_name)
def startElement(self, name, attrs)
def singlify_datasets(self)
def option_handler_castor_dir(self, option, opt_str, value, parser)
def option_handler_force(self, option, opt_str, value, parser)
def option_handler_caf_access(self, option, opt_str, value, parser)
def option_handler_frontier_connection(self, option, opt_str, value, parser)
def option_handler_crab_submission(self, option, opt_str, value, parser)
def create_harvesting_config(self, dataset_name)
def create_castor_path_name_special(self, dataset_name, run_number, castor_path_common)
def option_handler_harvesting_type(self, option, opt_str, value, parser)
def write_crab_config(self)
def replace(string, replacements)
def write_multicrab_config(self)
def build_runs_ignore_list(self)
def current_element(self)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
def option_handler_preferred_site(self, option, opt_str, value, parser)
def option_handler_quiet(self, option, opt_str, value, parser)
frontier_connection_overridden
def create_castor_path_name_common(self, dataset_name)
def show_exit_message(self)
Helper class: Error exception.
def check_globaltag(self, globaltag=None)
def ref_hist_mappings_needed(self, dataset_name=None)
T reduce(std::vector< T > x, Op op)
def dbs_resolve_runs(self, dataset_name)
Helper class: Usage exception.
def option_handler_no_ref_hists(self, option, opt_str, value, parser)
def create_me_extraction_config(self, dataset_name)
def create_me_summary_config_file_name(self, dataset_name)
def config_file_header(self)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def build_datasets_information(self)
def check_globaltag_exists(self, globaltag, connect_name)
def check_ref_hist_mappings(self)
def check_ref_hist_tag(self, tag_name)
book_keeping_file_name_default
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def dbs_resolve_datatype(self, dataset_name)
def create_multicrab_block_name(self, dataset_name, run_number, index)
def __init__(self, cmd_line_opts=None)
bool insert(Storage &iStorage, ItemType *iItem, const IdTag &iIdTag)
def __init__(self, tag_names)
def setup_harvesting_info(self)
Helper class: DBSXMLHandler.
def option_handler_list_types(self, option, opt_str, value, parser)
def create_output_file_name(self, dataset_name, run_number=None)
def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser)
def option_handler_book_keeping_file(self, option, opt_str, value, parser)
def pick_a_site(self, sites, cmssw_version)
def create_and_check_castor_dir(self, castor_dir)
def split(sequence, size)
def dbs_resolve_cmssw_version(self, dataset_name)
def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name)
def dbs_resolve_number_of_events(self, dataset_name, run_number=None)
def set_output_level(self, output_level)
static std::string join(char **cmd)
def load_ref_hist_mappings(self)
def parse_cmd_line_options(self)
def option_handler_input_Jsonfile(self, option, opt_str, value, parser)
def option_handler_input_todofile(self, option, opt_str, value, parser)
def check_results_validity(self)
def option_handler_input_spec(self, option, opt_str, value, parser)
def option_handler_debug(self, option, opt_str, value, parser)
def remove(d, key, TELL=False)
def dbs_resolve_globaltag(self, dataset_name)
def format_conditions_string(self, globaltag)
def build_runs_list(self, input_method, input_name)
def check_dataset_list(self)
def create_harvesting_config_file_name(self, dataset_name)
def dbs_check_dataset_spread(self, dataset_name)
def build_dataset_use_list(self)
def create_me_summary_output_file_name(self, dataset_name)
def characters(self, content)
def db_account_name_cms_cond_globaltag(self)
def option_handler_no_t1access(self, option, opt_str, value, parser)
def option_handler_globaltag(self, option, opt_str, value, parser)
def db_account_name_cms_cond_dqm_summary(self)
def check_input_status(self)
def dbs_resolve_dataset_name(self, dataset_name)
def create_config_file_name(self, dataset_name, run_number)
def build_runs_use_list(self)
def create_multicrab_config(self)
def create_and_check_castor_dirs(self)
ref_hist_mappings_file_name_default
def create_harvesting_output_file_name(self, dataset_name, run_number)
def endElement(self, name)
def process_dataset_ignore_list(self)
ref_hist_mappings_file_name
def build_dataset_list(self, input_method, input_name)