15 """Main program to run all kinds of harvesting. 17 These are the basic kinds of harvesting implemented (contact me if 18 your favourite is missing): 20 - RelVal : Run for release validation samples. Makes heavy use of MC 23 - RelValFS: FastSim RelVal. 25 - MC : Run for MC samples. 27 - DQMOffline : Run for real data (could also be run for MC). 29 For the mappings of these harvesting types to sequence names please 30 see the setup_harvesting_info() and option_handler_list_types() 34 from __future__
import print_function
38 __version__ =
"3.8.2p1" 39 __author__ =
"Jeroen Hegeman (jeroen.hegeman@cern.ch)," \
40 "Niklas Pietsch (niklas.pietsch@desy.de)" 42 twiki_url =
"https://twiki.cern.ch/twiki/bin/view/CMS/CmsHarvester" 98 from inspect
import getargspec
99 from random
import choice
104 from DBSAPI.dbsApi
import DbsApi
107 from functools
import reduce
110 global SAXParseException
112 from xml.sax
import SAXParseException
114 import Configuration.PyReleaseValidation
115 from Configuration.PyReleaseValidation.ConfigBuilder
import \
116 ConfigBuilder, defaultOptions
135 return repr(self.
msg)
146 return repr(self.
msg)
153 """Helper class to add some customised help output to cmsHarvester. 155 We want to add some instructions, as well as a pointer to the CMS 165 usage_lines.append(sep_line)
166 usage_lines.append(
"Welcome to the CMS harvester, a (hopefully useful)")
167 usage_lines.append(
"tool to create harvesting configurations.")
168 usage_lines.append(
"For more information please have a look at the CMS Twiki:")
169 usage_lines.append(
" %s" % twiki_url)
170 usage_lines.append(sep_line)
171 usage_lines.append(
"")
175 usage_lines.append(optparse.IndentedHelpFormatter. \
178 formatted_usage =
"\n".
join(usage_lines)
179 return formatted_usage
188 """XML handler class to parse DBS results. 190 The tricky thing here is that older DBS versions (2.0.5 and 191 earlier) return results in a different XML format than newer 192 versions. Previously the result values were returned as attributes 193 to the `result' element. The new approach returns result values as 194 contents of named elements. 196 The old approach is handled directly in startElement(), the new 197 approach in characters(). 199 NOTE: All results are returned in the form of string values of 210 "dataset.tag" :
"PROCESSEDDATASET_GLOBALTAG",
211 "datatype.type" :
"PRIMARYDSTYPE_TYPE",
212 "run" :
"RUNS_RUNNUMBER",
213 "run.number" :
"RUNS_RUNNUMBER",
214 "file.name" :
"FILES_LOGICALFILENAME",
215 "file.numevents" :
"FILES_NUMBEROFEVENTS",
216 "algo.version" :
"APPVERSION_VERSION",
217 "site" :
"STORAGEELEMENT_SENAME",
228 self.element_position.append(name)
237 key = DBSXMLHandler.mapping[name]
238 value =
str(attrs[key])
248 "closing unopenend element `%s'" % name
257 self.element_position.pop()
265 self.current_value.append(content)
271 """Make sure that all results arrays have equal length. 273 We should have received complete rows from DBS. I.e. all 274 results arrays in the handler should be of equal length. 280 res_names = self.results.keys()
281 if len(res_names) > 1:
282 for res_name
in res_names[1:]:
283 res_tmp = self.
results[res_name]
284 if len(res_tmp) != len(self.
results[res_names[0]]):
285 results_valid =
False 296 """Class to perform CMS harvesting. 298 More documentation `obviously' to follow. 305 "Initialize class and process command line options." 332 "single-step-allow-partial",
362 for key
in self.frontier_connection_name.keys():
425 "dqm/offline/harvesting_output/" 485 if cmd_line_opts
is None:
486 cmd_line_opts = sys.argv[1:]
490 log_handler = logging.StreamHandler()
493 log_formatter = logging.Formatter(
"%(message)s")
494 log_handler.setFormatter(log_formatter)
495 logger = logging.getLogger()
497 logger.addHandler(log_handler)
509 "Clean up after ourselves." 521 "Create a timestamp to use in the created config files." 523 time_now = datetime.datetime.utcnow()
525 time_now = time_now.replace(microsecond = 0)
526 time_stamp =
"%sUTC" % datetime.datetime.isoformat(time_now)
534 "Spit out an identification string for cmsHarvester.py." 536 ident_str =
"`cmsHarvester.py " \
537 "version %s': cmsHarvester.py %s" % \
539 reduce(
lambda x, y: x+
' '+y, sys.argv[1:]))
546 """Create the conditions string needed for `cmsDriver'. 548 Just glueing the FrontierConditions bit in front of it really. 559 if globaltag.lower().
find(
"conditions") > -1:
560 conditions_string = globaltag
562 conditions_string =
"FrontierConditions_GlobalTag,%s" % \
566 return conditions_string
571 """Return the database account name used to store the GlobalTag. 573 The name of the database account depends (albeit weakly) on 574 the CMSSW release version. 580 account_name =
"CMS_COND_31X_GLOBALTAG" 588 """See db_account_name_cms_cond_globaltag.""" 591 version = self.cmssw_version[6:11]
592 if version <
"3_4_0":
593 account_name =
"CMS_COND_31X_DQM_SUMMARY" 595 account_name =
"CMS_COND_34X" 603 "Create a nice header to be used to mark the generated files." 609 tmp.append(
"# %s" % time_stamp)
610 tmp.append(
"# WARNING: This file was created automatically!")
612 tmp.append(
"# Created by %s" % ident_str)
614 header =
"\n".
join(tmp)
622 """Adjust the level of output generated. 625 - normal : default level of output 626 - quiet : less output than the default 627 - verbose : some additional information 628 - debug : lots more information, may be overwhelming 630 NOTE: The debug option is a bit special in the sense that it 631 also modifies the output format. 638 "NORMAL" : logging.INFO,
639 "QUIET" : logging.WARNING,
640 "VERBOSE" : logging.INFO,
641 "DEBUG" : logging.DEBUG
644 output_level = output_level.upper()
652 self.logger.fatal(
"Unknown output level `%s'" % ouput_level)
661 """Switch to debug mode. 663 This both increases the amount of output generated, as well as 664 changes the format used (more detailed information is given). 669 log_formatter_debug = logging.Formatter(
"[%(levelname)s] " \
679 log_handler = self.logger.handlers[0]
680 log_handler.setFormatter(log_formatter_debug)
688 "Switch to quiet mode: less verbose." 697 """Switch on `force mode' in which case we don't brake for nobody. 699 In so-called `force mode' all sanity checks are performed but 700 we don't halt on failure. Of course this requires some care 705 self.logger.debug(
"Switching on `force mode'.")
713 """Set the harvesting type to be used. 715 This checks that no harvesting type is already set, and sets 716 the harvesting type to be used to the one specified. If a 717 harvesting type is already set an exception is thrown. The 718 same happens when an unknown type is specified. 727 value = value.lower()
730 type_index = harvesting_types_lowered.index(value)
734 self.logger.fatal(
"Unknown harvesting type `%s'" % \
736 self.logger.fatal(
" possible types are: %s" %
738 raise Usage(
"Unknown harvesting type `%s'" % \
744 msg =
"Only one harvesting type should be specified" 745 self.logger.fatal(msg)
749 self.logger.info(
"Harvesting type to be used: `%s'" % \
757 """Set the harvesting mode to be used. 759 Single-step harvesting can be used for samples that are 760 located completely at a single site (= SE). Otherwise use 766 harvesting_mode = value.lower()
768 msg =
"Unknown harvesting mode `%s'" % harvesting_mode
769 self.logger.fatal(msg)
770 self.logger.fatal(
" possible modes are: %s" % \
777 msg =
"Only one harvesting mode should be specified" 778 self.logger.fatal(msg)
782 self.logger.info(
"Harvesting mode to be used: `%s'" % \
790 """Set the GlobalTag to be used, overriding our own choices. 792 By default the cmsHarvester will use the GlobalTag with which 793 a given dataset was created also for the harvesting. The 794 --globaltag option is the way to override this behaviour. 800 msg =
"Only one GlobalTag should be specified" 801 self.logger.fatal(msg)
805 self.logger.info(
"GlobalTag to be used: `%s'" % \
813 "Switch use of all reference histograms off." 817 self.logger.warning(
"Switching off all use of reference histograms")
825 """Override the default Frontier connection string. 827 Please only use this for testing (e.g. when a test payload has 828 been inserted into cms_orc_off instead of cms_orc_on). 830 This method gets called for three different command line 832 - --frontier-connection, 833 - --frontier-connection-for-globaltag, 834 - --frontier-connection-for-refhists. 835 Appropriate care has to be taken to make sure things are only 841 frontier_type = opt_str.split(
"-")[-1]
842 if frontier_type ==
"connection":
844 frontier_types = self.frontier_connection_name.keys()
846 frontier_types = [frontier_type]
850 for connection_name
in frontier_types:
852 msg =
"Please specify either:\n" \
853 " `--frontier-connection' to change the " \
854 "Frontier connection used for everything, or\n" \
855 "either one or both of\n" \
856 " `--frontier-connection-for-globaltag' to " \
857 "change the Frontier connection used for the " \
859 " `--frontier-connection-for-refhists' to change " \
860 "the Frontier connection used for the " \
861 "reference histograms." 862 self.logger.fatal(msg)
865 frontier_prefix =
"frontier://" 866 if not value.startswith(frontier_prefix):
867 msg =
"Expecting Frontier connections to start with " \
868 "`%s'. You specified `%s'." % \
869 (frontier_prefix, value)
870 self.logger.fatal(msg)
874 if value.find(
"FrontierProd") < 0
and \
875 value.find(
"FrontierProd") < 0:
876 msg =
"Expecting Frontier connections to contain either " \
877 "`FrontierProd' or `FrontierPrep'. You specified " \
878 "`%s'. Are you sure?" % \
880 self.logger.warning(msg)
882 if not value.endswith(
"/"):
885 for connection_name
in frontier_types:
889 frontier_type_str =
"unknown" 890 if connection_name ==
"globaltag":
891 frontier_type_str =
"the GlobalTag" 892 elif connection_name ==
"refhists":
893 frontier_type_str =
"the reference histograms" 895 self.logger.warning(
"Overriding default Frontier " \
896 "connection for %s " \
934 if opt_str.lower().
find(
"ignore") > -1:
940 if opt_str.lower().
find(
"dataset") > -1:
941 select_type =
"datasets" 945 if not self.
input_method[select_type][spec_type]
is None:
946 msg =
"Please only specify one input method " \
947 "(for the `%s' case)" % opt_str
948 self.logger.fatal(msg)
951 input_method = opt_str.replace(
"-",
"").
replace(
"ignore",
"")
952 self.
input_method[select_type][spec_type] = input_method
953 self.
input_name[select_type][spec_type] = value
955 self.logger.debug(
"Input method for the `%s' case: %s" % \
956 (spec_type, input_method))
963 """Store the name of the file to be used for book keeping. 965 The only check done here is that only a single book keeping 973 msg =
"Only one book keeping file should be specified" 974 self.logger.fatal(msg)
978 self.logger.info(
"Book keeping file to be used: `%s'" % \
986 """Store the name of the file for the ref. histogram mapping. 993 msg =
"Only one reference histogram mapping file " \
994 "should be specified" 995 self.logger.fatal(msg)
999 self.logger.info(
"Reference histogram mapping file " \
1000 "to be used: `%s'" % \
1062 """Specify where on CASTOR the output should go. 1064 At the moment only output to CERN CASTOR is 1065 supported. Eventually the harvested results should go into the 1066 central place for DQM on CASTOR anyway. 1073 castor_prefix = self.castor_prefix
1076 castor_dir = os.path.join(os.path.sep, castor_dir)
1077 self.castor_base_dir = os.path.normpath(castor_dir)
1079 self.logger.info(
"CASTOR (base) area to be used: `%s'" % \
1080 self.castor_base_dir)
1087 """Set the self.no_t1access flag to try and create jobs that 1088 run without special `t1access' role. 1092 self.non_t1access =
True 1094 self.logger.warning(
"Running in `non-t1access' mode. " \
1095 "Will try to create jobs that run " \
1096 "without special rights but no " \
1097 "further promises...")
1104 """Set the self.caf_access flag to try and create jobs that 1108 self.caf_access =
True 1110 self.logger.warning(
"Running in `caf_access' mode. " \
1111 "Will try to create jobs that run " \
1113 "further promises...")
1120 """Set process.dqmSaver.saveByLumiSectiont=1 in cfg harvesting file 1122 self.saveByLumiSection =
True 1124 self.logger.warning(
"waning concerning saveByLumiSection option")
1132 """Crab jobs are not created and 1133 "submitted automatically", 1135 self.crab_submission =
True 1143 self.nr_max_sites = value
1149 self.preferred_site = value
1154 """List all harvesting types and their mappings. 1156 This lists all implemented harvesting types with their 1157 corresponding mappings to sequence names. This had to be 1158 separated out from the help since it depends on the CMSSW 1159 version and was making things a bit of a mess. 1161 NOTE: There is no way (at least not that I could come up with) 1162 to code this in a neat generic way that can be read both by 1163 this method and by setup_harvesting_info(). Please try hard to 1164 keep these two methods in sync! 1169 sep_line_short =
"-" * 20
1172 print(
"The following harvesting types are available:")
1175 print(
"`RelVal' maps to:")
1176 print(
" pre-3_3_0 : HARVESTING:validationHarvesting")
1177 print(
" 3_4_0_pre2 and later: HARVESTING:validationHarvesting+dqmHarvesting")
1178 print(
" Exceptions:")
1179 print(
" 3_3_0_pre1-4 : HARVESTING:validationHarvesting")
1180 print(
" 3_3_0_pre6 : HARVESTING:validationHarvesting")
1181 print(
" 3_4_0_pre1 : HARVESTING:validationHarvesting")
1183 print(sep_line_short)
1185 print(
"`RelValFS' maps to:")
1186 print(
" always : HARVESTING:validationHarvestingFS")
1188 print(sep_line_short)
1190 print(
"`MC' maps to:")
1191 print(
" always : HARVESTING:validationprodHarvesting")
1193 print(sep_line_short)
1195 print(
"`DQMOffline' maps to:")
1196 print(
" always : HARVESTING:dqmHarvesting")
1209 """Fill our dictionary with all info needed to understand 1212 This depends on the CMSSW version since at some point the 1213 names and sequences were modified. 1215 NOTE: There is no way (at least not that I could come up with) 1216 to code this in a neat generic way that can be read both by 1217 this method and by option_handler_list_types(). Please try 1218 hard to keep these two methods in sync! 1222 assert not self.cmssw_version
is None, \
1223 "ERROR setup_harvesting() requires " \
1224 "self.cmssw_version to be set!!!" 1226 harvesting_info = {}
1229 harvesting_info[
"DQMOffline"] = {}
1230 harvesting_info[
"DQMOffline"][
"beamspot"] =
None 1231 harvesting_info[
"DQMOffline"][
"eventcontent"] =
None 1232 harvesting_info[
"DQMOffline"][
"harvesting"] =
"AtRunEnd" 1234 harvesting_info[
"RelVal"] = {}
1235 harvesting_info[
"RelVal"][
"beamspot"] =
None 1236 harvesting_info[
"RelVal"][
"eventcontent"] =
None 1237 harvesting_info[
"RelVal"][
"harvesting"] =
"AtRunEnd" 1239 harvesting_info[
"RelValFS"] = {}
1240 harvesting_info[
"RelValFS"][
"beamspot"] =
None 1241 harvesting_info[
"RelValFS"][
"eventcontent"] =
None 1242 harvesting_info[
"RelValFS"][
"harvesting"] =
"AtRunEnd" 1244 harvesting_info[
"MC"] = {}
1245 harvesting_info[
"MC"][
"beamspot"] =
None 1246 harvesting_info[
"MC"][
"eventcontent"] =
None 1247 harvesting_info[
"MC"][
"harvesting"] =
"AtRunEnd" 1257 assert self.cmssw_version.startswith(
"CMSSW_")
1260 version = self.cmssw_version[6:]
1266 if version <
"3_3_0":
1267 step_string =
"validationHarvesting" 1268 elif version
in [
"3_3_0_pre1",
"3_3_0_pre2",
1269 "3_3_0_pre3",
"3_3_0_pre4",
1270 "3_3_0_pre6",
"3_4_0_pre1"]:
1271 step_string =
"validationHarvesting" 1273 step_string =
"validationHarvesting+dqmHarvesting" 1275 harvesting_info[
"RelVal"][
"step_string"] = step_string
1279 assert not step_string
is None, \
1280 "ERROR Could not decide a RelVal harvesting sequence " \
1281 "for CMSSW version %s" % self.cmssw_version
1287 step_string =
"validationHarvestingFS" 1289 harvesting_info[
"RelValFS"][
"step_string"] = step_string
1294 step_string =
"validationprodHarvesting" 1296 harvesting_info[
"MC"][
"step_string"] = step_string
1300 assert not step_string
is None, \
1301 "ERROR Could not decide a MC harvesting " \
1302 "sequence for CMSSW version %s" % self.cmssw_version
1308 step_string =
"dqmHarvesting" 1310 harvesting_info[
"DQMOffline"][
"step_string"] = step_string
1314 self.harvesting_info = harvesting_info
1316 self.logger.info(
"Based on the CMSSW version (%s) " \
1317 "I decided to use the `HARVESTING:%s' " \
1318 "sequence for %s harvesting" % \
1319 (self.cmssw_version,
1320 self.harvesting_info[self.harvesting_type][
"step_string"],
1321 self.harvesting_type))
1328 """Build the common part of the output path to be used on 1331 This consists of the CASTOR area base path specified by the 1332 user and a piece depending on the data type (data vs. MC), the 1333 harvesting type and the dataset name followed by a piece 1334 containing the run number and event count. (See comments in 1335 create_castor_path_name_special for details.) This method 1336 creates the common part, without run number and event count. 1340 castor_path = self.castor_base_dir
1345 datatype = self.datasets_information[dataset_name][
"datatype"]
1346 datatype = datatype.lower()
1347 castor_path = os.path.join(castor_path, datatype)
1350 harvesting_type = self.harvesting_type
1351 harvesting_type = harvesting_type.lower()
1352 castor_path = os.path.join(castor_path, harvesting_type)
1362 release_version = self.cmssw_version
1363 release_version = release_version.lower(). \
1366 castor_path = os.path.join(castor_path, release_version)
1369 dataset_name_escaped = self.escape_dataset_name(dataset_name)
1370 castor_path = os.path.join(castor_path, dataset_name_escaped)
1374 castor_path = os.path.normpath(castor_path)
1382 dataset_name, run_number,
1383 castor_path_common):
1384 """Create the specialised part of the CASTOR output dir name. 1386 NOTE: To avoid clashes with `incremental harvesting' 1387 (re-harvesting when a dataset grows) we have to include the 1388 event count in the path name. The underlying `problem' is that 1389 CRAB does not overwrite existing output files so if the output 1390 file already exists CRAB will fail to copy back the output. 1392 NOTE: It's not possible to create different kinds of 1393 harvesting jobs in a single call to this tool. However, in 1394 principle it could be possible to create both data and MC jobs 1397 NOTE: The number of events used in the path name is the 1398 _total_ number of events in the dataset/run at the time of 1399 harvesting. If we're doing partial harvesting the final 1400 results will reflect lower statistics. This is a) the easiest 1401 to code and b) the least likely to lead to confusion if 1402 someone ever decides to swap/copy around file blocks between 1407 castor_path = castor_path_common
1412 castor_path = os.path.join(castor_path,
"run_%d" % run_number)
1420 castor_path = os.path.join(castor_path,
"nevents")
1424 castor_path = os.path.normpath(castor_path)
1432 """Make sure all required CASTOR output dirs exist. 1434 This checks the CASTOR base dir specified by the user as well 1435 as all the subdirs required by the current set of jobs. 1439 self.logger.info(
"Checking (and if necessary creating) CASTOR " \
1440 "output area(s)...")
1443 self.create_and_check_castor_dir(self.castor_base_dir)
1447 for (dataset_name, runs)
in six.iteritems(self.datasets_to_use):
1450 castor_dirs.append(self.datasets_information[dataset_name] \
1451 [
"castor_path"][run])
1452 castor_dirs_unique = sorted(set(castor_dirs))
1456 ndirs = len(castor_dirs_unique)
1457 step =
max(ndirs / 10, 1)
1458 for (i, castor_dir)
in enumerate(castor_dirs_unique):
1459 if (i + 1) % step == 0
or \
1461 self.logger.info(
" %d/%d" % \
1463 self.create_and_check_castor_dir(castor_dir)
1470 self.logger.debug(
"Checking if path `%s' is empty" % \
1472 cmd =
"rfdir %s" % castor_dir
1473 (status, output) = commands.getstatusoutput(cmd)
1475 msg =
"Could not access directory `%s'" \
1476 " !!! This is bad since I should have just" \
1477 " created it !!!" % castor_dir
1478 self.logger.fatal(msg)
1481 self.logger.warning(
"Output directory `%s' is not empty:" \
1482 " new jobs will fail to" \
1483 " copy back output" % \
1491 """Check existence of the give CASTOR dir, if necessary create 1494 Some special care has to be taken with several things like 1495 setting the correct permissions such that CRAB can store the 1496 output results. Of course this means that things like 1497 /castor/cern.ch/ and user/j/ have to be recognised and treated 1500 NOTE: Only CERN CASTOR area (/castor/cern.ch/) supported for 1503 NOTE: This method uses some slightly tricky caching to make 1504 sure we don't keep over and over checking the same base paths. 1511 def split_completely(path):
1512 (parent_path, name) = os.path.split(path)
1514 return (parent_path, )
1516 return split_completely(parent_path) + (name, )
1522 def extract_permissions(rfstat_output):
1523 """Parse the output from rfstat and return the 1524 5-digit permissions string.""" 1526 permissions_line = [i
for i
in output.split(
"\n") \
1527 if i.lower().
find(
"protection") > -1]
1528 regexp = re.compile(
".*\(([0123456789]{5})\).*")
1529 match = regexp.search(rfstat_output)
1530 if not match
or len(match.groups()) != 1:
1531 msg =
"Could not extract permissions " \
1532 "from output: %s" % rfstat_output
1533 self.logger.fatal(msg)
1535 permissions = match.group(1)
1552 castor_paths_dont_touch = {
1553 0: [
"/",
"castor",
"cern.ch",
"cms",
"store",
"temp",
1554 "dqm",
"offline",
"user"],
1555 -1: [
"user",
"store"]
1558 self.logger.debug(
"Checking CASTOR path `%s'" % castor_dir)
1563 castor_path_pieces = split_completely(castor_dir)
1569 check_sizes = sorted(castor_paths_dont_touch.keys())
1570 len_castor_path_pieces = len(castor_path_pieces)
1571 for piece_index
in xrange (len_castor_path_pieces):
1572 skip_this_path_piece =
False 1573 piece = castor_path_pieces[piece_index]
1576 for check_size
in check_sizes:
1578 if (piece_index + check_size) > -1:
1582 if castor_path_pieces[piece_index + check_size]
in castor_paths_dont_touch[check_size]:
1584 skip_this_path_piece =
True 1592 path = os.path.join(path, piece)
1599 if path
in self.castor_path_checks_cache:
1601 except AttributeError:
1603 self.castor_path_checks_cache = []
1604 self.castor_path_checks_cache.append(path)
1622 if not skip_this_path_piece:
1630 self.logger.debug(
"Checking if path `%s' exists" % \
1632 cmd =
"rfstat %s" % path
1633 (status, output) = commands.getstatusoutput(cmd)
1636 self.logger.debug(
"Creating path `%s'" % path)
1637 cmd =
"nsmkdir -m 775 %s" % path
1638 (status, output) = commands.getstatusoutput(cmd)
1640 msg =
"Could not create directory `%s'" % path
1641 self.logger.fatal(msg)
1643 cmd =
"rfstat %s" % path
1644 (status, output) = commands.getstatusoutput(cmd)
1649 permissions = extract_permissions(output)
1650 if not permissions.startswith(
"40"):
1651 msg =
"Path `%s' is not a directory(?)" % path
1652 self.logger.fatal(msg)
1657 self.logger.debug(
"Checking permissions for path `%s'" % path)
1658 cmd =
"rfstat %s" % path
1659 (status, output) = commands.getstatusoutput(cmd)
1661 msg =
"Could not obtain permissions for directory `%s'" % \
1663 self.logger.fatal(msg)
1666 permissions = extract_permissions(output)[-3:]
1670 if piece_index == (len_castor_path_pieces - 1):
1673 permissions_target =
"775" 1676 permissions_target =
"775" 1679 permissions_new = []
1680 for (i, j)
in zip(permissions, permissions_target):
1682 permissions_new =
"".
join(permissions_new)
1683 self.logger.debug(
" current permissions: %s" % \
1685 self.logger.debug(
" target permissions : %s" % \
1687 if permissions_new != permissions:
1689 self.logger.debug(
"Changing permissions of `%s' " \
1690 "to %s (were %s)" % \
1691 (path, permissions_new, permissions))
1692 cmd =
"rfchmod %s %s" % (permissions_new, path)
1693 (status, output) = commands.getstatusoutput(cmd)
1695 msg =
"Could not change permissions for path `%s' " \
1696 "to %s" % (path, permissions_new)
1697 self.logger.fatal(msg)
1700 self.logger.debug(
" Permissions ok (%s)" % permissions_new)
1709 sites_forbidden = []
1711 if (self.preferred_site ==
"CAF")
or (self.preferred_site ==
"caf.cern.ch"):
1712 self.caf_access =
True 1714 if self.caf_access ==
False:
1715 sites_forbidden.append(
"caf.cern.ch")
1726 "cmssrm-fzk.gridka.de",
1728 "gridka-dCache.fzk.de",
1729 "srm-cms.gridpp.rl.ac.uk",
1730 "srm.grid.sinica.edu.tw",
1731 "srm2.grid.sinica.edu.tw",
1733 "storm-fe-cms.cr.cnaf.infn.it" 1737 "CAF" :
"caf.cern.ch",
1738 "CH" :
"srm-cms.cern.ch",
1739 "FR" :
"ccsrm.in2p3.fr",
1740 "DE" :
"cmssrm-fzk.gridka.de",
1741 "GOV" :
"cmssrm.fnal.gov",
1742 "DE2" :
"gridka-dCache.fzk.de",
1743 "UK" :
"srm-cms.gridpp.rl.ac.uk",
1744 "TW" :
"srm.grid.sinica.edu.tw",
1745 "TW2" :
"srm2.grid.sinica.edu.tw",
1746 "ES" :
"srmcms.pic.es",
1747 "IT" :
"storm-fe-cms.cr.cnaf.infn.it" 1750 if self.non_t1access:
1751 sites_forbidden.extend(all_t1)
1753 for site
in sites_forbidden:
1757 if self.preferred_site
in country_codes:
1758 self.preferred_site = country_codes[self.preferred_site]
1760 if self.preferred_site !=
"no preference":
1761 if self.preferred_site
in sites:
1762 sites = [self.preferred_site]
1775 while len(sites) > 0
and \
1782 t1_sites.append(site)
1783 if site ==
"caf.cern.ch":
1784 t1_sites.append(site)
1791 if len(t1_sites) > 0:
1792 se_name = choice(t1_sites)
1795 se_name = choice(sites)
1799 if se_name
in self.sites_and_versions_cache
and \
1800 cmssw_version
in self.sites_and_versions_cache[se_name]:
1801 if self.sites_and_versions_cache[se_name][cmssw_version]:
1805 self.logger.debug(
" --> rejecting site `%s'" % se_name)
1806 sites.remove(se_name)
1809 self.logger.info(
"Checking if site `%s' " \
1810 "has CMSSW version `%s'" % \
1811 (se_name, cmssw_version))
1812 self.sites_and_versions_cache[se_name] = {}
1827 cmd =
"lcg-info --list-ce " \
1830 "CEStatus=Production," \
1832 (cmssw_version, se_name)
1833 (status, output) = commands.getstatusoutput(cmd)
1835 self.logger.error(
"Could not check site information " \
1836 "for site `%s'" % se_name)
1838 if (len(output) > 0)
or (se_name ==
"caf.cern.ch"):
1839 self.sites_and_versions_cache[se_name][cmssw_version] =
True 1843 self.sites_and_versions_cache[se_name][cmssw_version] =
False 1844 self.logger.debug(
" --> rejecting site `%s'" % se_name)
1845 sites.remove(se_name)
1847 if site_name
is self.no_matching_site_found_str:
1848 self.logger.error(
" --> no matching site found")
1849 self.logger.error(
" --> Your release or SCRAM " \
1850 "architecture may not be available" \
1851 "anywhere on the (LCG) grid.")
1853 self.logger.debug(
" (command used: `%s')" % cmd)
1855 self.logger.debug(
" --> selected site `%s'" % site_name)
1859 if site_name
is None:
1860 site_name = self.no_matching_site_found_str
1863 self.all_sites_found =
False 1875 parser = optparse.OptionParser(version=
"%s %s" % \
1876 (
"%prog", self.version),
1879 self.option_parser = parser
1882 parser.add_option(
"-d",
"--debug",
1883 help=
"Switch on debug mode",
1885 callback=self.option_handler_debug)
1888 parser.add_option(
"-q",
"--quiet",
1889 help=
"Be less verbose",
1891 callback=self.option_handler_quiet)
1895 parser.add_option(
"",
"--force",
1896 help=
"Force mode. Do not abort on sanity check " 1899 callback=self.option_handler_force)
1902 parser.add_option(
"",
"--harvesting_type",
1903 help=
"Harvesting type: %s" % \
1904 ", ".
join(self.harvesting_types),
1906 callback=self.option_handler_harvesting_type,
1908 metavar=
"HARVESTING_TYPE")
1911 parser.add_option(
"",
"--harvesting_mode",
1912 help=
"Harvesting mode: %s (default = %s)" % \
1913 (
", ".
join(self.harvesting_modes),
1914 self.harvesting_mode_default),
1916 callback=self.option_handler_harvesting_mode,
1918 metavar=
"HARVESTING_MODE")
1921 parser.add_option(
"",
"--globaltag",
1922 help=
"GlobalTag to use. Default is the ones " \
1923 "the dataset was created with for MC, for data" \
1924 "a GlobalTag has to be specified.",
1926 callback=self.option_handler_globaltag,
1928 metavar=
"GLOBALTAG")
1931 parser.add_option(
"",
"--no-ref-hists",
1932 help=
"Don't use any reference histograms",
1934 callback=self.option_handler_no_ref_hists)
1938 parser.add_option(
"",
"--frontier-connection",
1939 help=
"Use this Frontier connection to find " \
1940 "GlobalTags and LocalTags (for reference " \
1941 "histograms).\nPlease only use this for " \
1944 callback=self.option_handler_frontier_connection,
1950 parser.add_option(
"",
"--frontier-connection-for-globaltag",
1951 help=
"Use this Frontier connection to find " \
1952 "GlobalTags.\nPlease only use this for " \
1955 callback=self.option_handler_frontier_connection,
1961 parser.add_option(
"",
"--frontier-connection-for-refhists",
1962 help=
"Use this Frontier connection to find " \
1963 "LocalTags (for reference " \
1964 "histograms).\nPlease only use this for " \
1967 callback=self.option_handler_frontier_connection,
1973 parser.add_option(
"",
"--dataset",
1974 help=
"Name (or regexp) of dataset(s) to process",
1977 callback=self.option_handler_input_spec,
1984 parser.add_option(
"",
"--dataset-ignore",
1985 help=
"Name (or regexp) of dataset(s) to ignore",
1987 callback=self.option_handler_input_spec,
1989 metavar=
"DATASET-IGNORE")
1993 parser.add_option(
"",
"--runs",
1994 help=
"Run number(s) to process",
1996 callback=self.option_handler_input_spec,
2002 parser.add_option(
"",
"--runs-ignore",
2003 help=
"Run number(s) to ignore",
2005 callback=self.option_handler_input_spec,
2007 metavar=
"RUNS-IGNORE")
2011 parser.add_option(
"",
"--datasetfile",
2012 help=
"File containing list of dataset names " \
2013 "(or regexps) to process",
2016 callback=self.option_handler_input_spec,
2019 metavar=
"DATASETFILE")
2023 parser.add_option(
"",
"--datasetfile-ignore",
2024 help=
"File containing list of dataset names " \
2025 "(or regexps) to ignore",
2027 callback=self.option_handler_input_spec,
2029 metavar=
"DATASETFILE-IGNORE")
2033 parser.add_option(
"",
"--runslistfile",
2034 help=
"File containing list of run numbers " \
2037 callback=self.option_handler_input_spec,
2039 metavar=
"RUNSLISTFILE")
2043 parser.add_option(
"",
"--runslistfile-ignore",
2044 help=
"File containing list of run numbers " \
2047 callback=self.option_handler_input_spec,
2049 metavar=
"RUNSLISTFILE-IGNORE")
2053 parser.add_option(
"",
"--Jsonrunfile",
2054 help=
"Jsonfile containing dictionary of run/lumisections pairs. " \
2055 "All lumisections of runs contained in dictionary are processed.",
2057 callback=self.option_handler_input_Jsonrunfile,
2059 metavar=
"JSONRUNFILE")
2063 parser.add_option(
"",
"--Jsonfile",
2064 help=
"Jsonfile containing dictionary of run/lumisections pairs. " \
2065 "Only specified lumisections of runs contained in dictionary are processed.",
2067 callback=self.option_handler_input_Jsonfile,
2073 parser.add_option(
"",
"--todo-file",
2074 help=
"Todo file containing a list of runs to process.",
2076 callback=self.option_handler_input_todofile,
2078 metavar=
"TODO-FILE")
2082 parser.add_option(
"",
"--refhistmappingfile",
2083 help=
"File to be use for the reference " \
2084 "histogram mappings. Default: `%s'." % \
2085 self.ref_hist_mappings_file_name_default,
2087 callback=self.option_handler_ref_hist_mapping_file,
2089 metavar=
"REFHISTMAPPING-FILE")
2094 parser.add_option(
"",
"--castordir",
2095 help=
"Place on CASTOR to store results. " \
2096 "Default: `%s'." % \
2097 self.castor_base_dir_default,
2099 callback=self.option_handler_castor_dir,
2101 metavar=
"CASTORDIR")
2105 parser.add_option(
"",
"--no-t1access",
2106 help=
"Try to create jobs that will run " \
2107 "without special `t1access' role",
2109 callback=self.option_handler_no_t1access)
2112 parser.add_option(
"",
"--caf-access",
2113 help=
"Crab jobs may run " \
2116 callback=self.option_handler_caf_access)
2119 parser.add_option(
"",
"--saveByLumiSection",
2120 help=
"set saveByLumiSection=1 in harvesting cfg file",
2122 callback=self.option_handler_saveByLumiSection)
2125 parser.add_option(
"",
"--automatic-crab-submission",
2126 help=
"Crab jobs are created and " \
2127 "submitted automatically",
2129 callback=self.option_handler_crab_submission)
2133 parser.add_option(
"",
"--max-sites",
2134 help=
"Max. number of sites each job is submitted to",
2136 callback=self.option_handler_sites,
2140 parser.add_option(
"",
"--site",
2141 help=
"Crab jobs are submitted to specified site. T1 sites may be shortened by the following (country) codes: \ 2142 srm-cms.cern.ch : CH \ 2143 ccsrm.in2p3.fr : FR \ 2144 cmssrm-fzk.gridka.de : DE \ 2145 cmssrm.fnal.gov : GOV \ 2146 gridka-dCache.fzk.de : DE2 \ 2147 rm-cms.gridpp.rl.ac.uk : UK \ 2148 srm.grid.sinica.edu.tw : TW \ 2149 srm2.grid.sinica.edu.tw : TW2 \ 2150 srmcms.pic.es : ES \ 2151 storm-fe-cms.cr.cnaf.infn.it : IT",
2153 callback=self.option_handler_preferred_site,
2158 parser.add_option(
"-l",
"--list",
2159 help=
"List all harvesting types and their" \
2160 "corresponding sequence names",
2162 callback=self.option_handler_list_types)
2168 if len(self.cmd_line_opts) < 1:
2169 self.cmd_line_opts = [
"--help"]
2178 for i
in [
"-d",
"--debug",
2180 if i
in self.cmd_line_opts:
2181 self.cmd_line_opts.remove(i)
2182 self.cmd_line_opts.insert(0, i)
2185 parser.set_defaults()
2186 (self.options, self.args) = parser.parse_args(self.cmd_line_opts)
2193 """Check completeness and correctness of input information. 2195 Check that all required information has been specified and 2196 that, at least as far as can be easily checked, it makes 2199 NOTE: This is also where any default values are applied. 2203 self.logger.info(
"Checking completeness/correctness of input...")
2207 if len(self.args) > 0:
2208 msg =
"Sorry but I don't understand `%s'" % \
2209 (
" ".
join(self.args))
2210 self.logger.fatal(msg)
2216 if self.harvesting_mode ==
"two-step":
2217 msg =
"--------------------\n" \
2218 " Sorry, but for the moment (well, till it works)" \
2219 " the two-step mode has been disabled.\n" \
2220 "--------------------\n" 2221 self.logger.fatal(msg)
2226 if self.harvesting_type
is None:
2227 msg =
"Please specify a harvesting type" 2228 self.logger.fatal(msg)
2231 if self.harvesting_mode
is None:
2232 self.harvesting_mode = self.harvesting_mode_default
2233 msg =
"No harvesting mode specified --> using default `%s'" % \
2234 self.harvesting_mode
2235 self.logger.warning(msg)
2241 if self.input_method[
"datasets"][
"use"]
is None:
2242 msg =
"Please specify an input dataset name " \
2243 "or a list file name" 2244 self.logger.fatal(msg)
2249 assert not self.input_name[
"datasets"][
"use"]
is None 2256 if self.use_ref_hists:
2257 if self.ref_hist_mappings_file_name
is None:
2258 self.ref_hist_mappings_file_name = self.ref_hist_mappings_file_name_default
2259 msg =
"No reference histogram mapping file specified --> " \
2260 "using default `%s'" % \
2261 self.ref_hist_mappings_file_name
2262 self.logger.warning(msg)
2268 if self.castor_base_dir
is None:
2269 self.castor_base_dir = self.castor_base_dir_default
2270 msg =
"No CASTOR area specified -> using default `%s'" % \
2271 self.castor_base_dir
2272 self.logger.warning(msg)
2276 if not self.castor_base_dir.startswith(self.castor_prefix):
2277 msg =
"CASTOR area does not start with `%s'" % \
2279 self.logger.fatal(msg)
2280 if self.castor_base_dir.find(
"castor") > -1
and \
2281 not self.castor_base_dir.find(
"cern.ch") > -1:
2282 self.logger.fatal(
"Only CERN CASTOR is supported")
2293 if self.globaltag
is None:
2294 self.logger.warning(
"No GlobalTag specified. This means I cannot")
2295 self.logger.warning(
"run on data, only on MC.")
2296 self.logger.warning(
"I will skip all data datasets.")
2301 if not self.globaltag
is None:
2302 if not self.globaltag.endswith(
"::All"):
2303 self.logger.warning(
"Specified GlobalTag `%s' does " \
2304 "not end in `::All' --> " \
2305 "appending this missing piece" % \
2307 self.globaltag =
"%s::All" % self.globaltag
2312 for (key, value)
in six.iteritems(self.frontier_connection_name):
2313 frontier_type_str =
"unknown" 2314 if key ==
"globaltag":
2315 frontier_type_str =
"the GlobalTag" 2316 elif key ==
"refhists":
2317 frontier_type_str =
"the reference histograms" 2319 if self.frontier_connection_overridden[key] ==
True:
2323 self.logger.info(
"Using %sdefault Frontier " \
2324 "connection for %s: `%s'" % \
2325 (non_str, frontier_type_str, value))
2334 """Check if CMSSW is setup. 2341 cmssw_version = os.getenv(
"CMSSW_VERSION")
2342 if cmssw_version
is None:
2343 self.logger.fatal(
"It seems CMSSW is not setup...")
2344 self.logger.fatal(
"($CMSSW_VERSION is empty)")
2345 raise Error(
"ERROR: CMSSW needs to be setup first!")
2347 self.cmssw_version = cmssw_version
2348 self.logger.info(
"Found CMSSW version %s properly set up" % \
2357 """Check if DBS is setup. 2364 dbs_home = os.getenv(
"DBSCMD_HOME")
2365 if dbs_home
is None:
2366 self.logger.fatal(
"It seems DBS is not setup...")
2367 self.logger.fatal(
" $DBSCMD_HOME is empty")
2368 raise Error(
"ERROR: DBS needs to be setup first!")
2386 self.logger.debug(
"Found DBS properly set up")
2394 """Setup the Python side of DBS. 2396 For more information see the DBS Python API documentation: 2397 https://twiki.cern.ch/twiki/bin/view/CMS/DBSApiDocumentation 2403 args[
"url"]=
"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/" \
2404 "servlet/DBSServlet" 2408 except DBSAPI.dbsApiException.DbsApiException
as ex:
2409 self.logger.fatal(
"Caught DBS API exception %s: %s " % \
2410 (ex.getClassName(), ex.getErrorMessage()))
2411 if ex.getErrorCode()
not in (
None,
""):
2412 logger.debug(
"DBS exception error code: ", ex.getErrorCode())
2420 """Use DBS to resolve a wildcarded dataset name. 2426 assert not self.dbs_api
is None 2431 if not (dataset_name.startswith(
"/")
and \
2432 dataset_name.endswith(
"RECO")):
2433 self.logger.warning(
"Dataset name `%s' does not sound " \
2434 "like a valid dataset name!" % \
2440 dbs_query =
"find dataset where dataset like %s " \
2441 "and dataset.status = VALID" % \
2444 api_result = api.executeQuery(dbs_query)
2445 except DBSAPI.dbsApiException.DbsApiException:
2446 msg =
"ERROR: Could not execute DBS query" 2447 self.logger.fatal(msg)
2452 parser = xml.sax.make_parser()
2453 parser.setContentHandler(handler)
2457 xml.sax.parseString(api_result, handler)
2458 except SAXParseException:
2459 msg =
"ERROR: Could not parse DBS server output" 2460 self.logger.fatal(msg)
2464 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2468 datasets = handler.results.values()[0]
2476 """Ask DBS for the CMSSW version used to create this dataset. 2482 assert not self.dbs_api
is None 2486 dbs_query =
"find algo.version where dataset = %s " \
2487 "and dataset.status = VALID" % \
2490 api_result = api.executeQuery(dbs_query)
2491 except DBSAPI.dbsApiException.DbsApiException:
2492 msg =
"ERROR: Could not execute DBS query" 2493 self.logger.fatal(msg)
2497 parser = xml.sax.make_parser()
2498 parser.setContentHandler(handler)
2501 xml.sax.parseString(api_result, handler)
2502 except SAXParseException:
2503 msg =
"ERROR: Could not parse DBS server output" 2504 self.logger.fatal(msg)
2508 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2511 cmssw_version = handler.results.values()[0]
2514 assert len(cmssw_version) == 1
2517 cmssw_version = cmssw_version[0]
2520 return cmssw_version
2570 """Ask DBS for the list of runs in a given dataset. 2572 # NOTE: This does not (yet?) skip/remove empty runs. There is 2573 # a bug in the DBS entry run.numevents (i.e. it always returns 2574 # zero) which should be fixed in the `next DBS release'. 2576 # https://savannah.cern.ch/bugs/?53452 2577 # https://savannah.cern.ch/bugs/?53711 2588 assert not self.dbs_api
is None 2592 dbs_query =
"find run where dataset = %s " \
2593 "and dataset.status = VALID" % \
2596 api_result = api.executeQuery(dbs_query)
2597 except DBSAPI.dbsApiException.DbsApiException:
2598 msg =
"ERROR: Could not execute DBS query" 2599 self.logger.fatal(msg)
2603 parser = xml.sax.make_parser()
2604 parser.setContentHandler(handler)
2607 xml.sax.parseString(api_result, handler)
2608 except SAXParseException:
2609 msg =
"ERROR: Could not parse DBS server output" 2610 self.logger.fatal(msg)
2614 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2617 runs = handler.results.values()[0]
2619 runs = sorted([
int(i)
for i
in runs])
2627 """Ask DBS for the globaltag corresponding to a given dataset. 2630 # This does not seem to work for data datasets? E.g. for 2631 # /Cosmics/Commissioning08_CRAFT0831X_V1_311_ReReco_FromSuperPointing_v1/RAW-RECO 2632 # Probaly due to the fact that the GlobalTag changed during 2640 assert not self.dbs_api
is None 2644 dbs_query =
"find dataset.tag where dataset = %s " \
2645 "and dataset.status = VALID" % \
2648 api_result = api.executeQuery(dbs_query)
2649 except DBSAPI.dbsApiException.DbsApiException:
2650 msg =
"ERROR: Could not execute DBS query" 2651 self.logger.fatal(msg)
2655 parser = xml.sax.make_parser()
2656 parser.setContentHandler(parser)
2659 xml.sax.parseString(api_result, handler)
2660 except SAXParseException:
2661 msg =
"ERROR: Could not parse DBS server output" 2662 self.logger.fatal(msg)
2666 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2669 globaltag = handler.results.values()[0]
2672 assert len(globaltag) == 1
2675 globaltag = globaltag[0]
2683 """Ask DBS for the the data type (data or mc) of a given 2690 assert not self.dbs_api
is None 2694 dbs_query =
"find datatype.type where dataset = %s " \
2695 "and dataset.status = VALID" % \
2698 api_result = api.executeQuery(dbs_query)
2699 except DBSAPI.dbsApiException.DbsApiException:
2700 msg =
"ERROR: Could not execute DBS query" 2701 self.logger.fatal(msg)
2705 parser = xml.sax.make_parser()
2706 parser.setContentHandler(handler)
2709 xml.sax.parseString(api_result, handler)
2710 except SAXParseException:
2711 msg =
"ERROR: Could not parse DBS server output" 2712 self.logger.fatal(msg)
2716 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2719 datatype = handler.results.values()[0]
2722 assert len(datatype) == 1
2725 datatype = datatype[0]
2736 """Determine the number of events in a given dataset (and run). 2738 Ask DBS for the number of events in a dataset. If a run number 2739 is specified the number of events returned is that in that run 2740 of that dataset. If problems occur we throw an exception. 2743 # Since DBS does not return the number of events correctly, 2744 # neither for runs nor for whole datasets, we have to work 2745 # around that a bit... 2752 assert not self.dbs_api
is None 2756 dbs_query =
"find file.name, file.numevents where dataset = %s " \
2757 "and dataset.status = VALID" % \
2759 if not run_number
is None:
2760 dbs_query = dbq_query + (
" and run = %d" % run_number)
2762 api_result = api.executeQuery(dbs_query)
2763 except DBSAPI.dbsApiException.DbsApiException:
2764 msg =
"ERROR: Could not execute DBS query" 2765 self.logger.fatal(msg)
2769 parser = xml.sax.make_parser()
2770 parser.setContentHandler(handler)
2773 xml.sax.parseString(api_result, handler)
2774 except SAXParseException:
2775 msg =
"ERROR: Could not parse DBS server output" 2776 self.logger.fatal(msg)
2780 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2783 num_events = sum(handler.results[
"file.numevents"])
3077 """Figure out the number of events in each run of this dataset. 3079 This is a more efficient way of doing this than calling 3080 dbs_resolve_number_of_events for each run. 3084 self.logger.debug(
"Checking spread of dataset `%s'" % dataset_name)
3088 assert not self.dbs_api
is None 3092 dbs_query =
"find run.number, site, file.name, file.numevents " \
3093 "where dataset = %s " \
3094 "and dataset.status = VALID" % \
3097 api_result = api.executeQuery(dbs_query)
3098 except DBSAPI.dbsApiException.DbsApiException:
3099 msg =
"ERROR: Could not execute DBS query" 3100 self.logger.fatal(msg)
3103 handler =
DBSXMLHandler([
"run.number",
"site",
"file.name",
"file.numevents"])
3104 parser = xml.sax.make_parser()
3105 parser.setContentHandler(handler)
3148 xml.sax.parseString(api_result, handler)
3149 except SAXParseException:
3150 msg =
"ERROR: Could not parse DBS server output" 3151 self.logger.fatal(msg)
3155 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 3162 for (index, site_name)
in enumerate(handler.results[
"site"]):
3172 if len(site_name) < 1:
3174 run_number =
int(handler.results[
"run.number"][index])
3175 file_name = handler.results[
"file.name"][index]
3176 nevents =
int(handler.results[
"file.numevents"][index])
3179 if run_number
not in files_info:
3181 files_info[run_number] = {}
3182 files_info[run_number][file_name] = (nevents,
3184 elif file_name
not in files_info[run_number]:
3186 files_info[run_number][file_name] = (nevents,
3193 assert nevents == files_info[run_number][file_name][0]
3195 files_info[run_number][file_name][1].
append(site_name)
3200 for run_number
in files_info.keys():
3201 files_without_sites = [i
for (i, j)
in \
3202 files_info[run_number].
items() \
3204 if len(files_without_sites) > 0:
3205 self.logger.warning(
"Removing %d file(s)" \
3206 " with empty site names" % \
3207 len(files_without_sites))
3208 for file_name
in files_without_sites:
3209 del files_info[run_number][file_name]
3215 num_events_catalog = {}
3216 for run_number
in files_info.keys():
3217 site_names =
list(set([j
for i
in files_info[run_number].
values()
for j
in i[1]]))
3223 if len(site_names) > 1:
3231 all_file_names = files_info[run_number].
keys()
3232 all_file_names = set(all_file_names)
3233 sites_with_complete_copies = []
3234 for site_name
in site_names:
3235 files_at_site = [i
for (i, (j, k)) \
3236 in files_info[run_number].
items() \
3238 files_at_site = set(files_at_site)
3239 if files_at_site == all_file_names:
3240 sites_with_complete_copies.append(site_name)
3241 if len(sites_with_complete_copies) < 1:
3247 if len(sites_with_complete_copies) > 1:
3266 self.logger.debug(
" -> run appears to be `mirrored'")
3268 self.logger.debug(
" -> run appears to be spread-out")
3271 len(sites_with_complete_copies) != len(site_names):
3275 for (file_name, (i, sites))
in files_info[run_number].
items():
3276 complete_sites = [site
for site
in sites \
3277 if site
in sites_with_complete_copies]
3278 files_info[run_number][file_name] = (i, complete_sites)
3279 site_names = sites_with_complete_copies
3281 self.logger.debug(
" for run #%d:" % run_number)
3282 num_events_catalog[run_number] = {}
3283 num_events_catalog[run_number][
"all_sites"] = sum([i[0]
for i
in files_info[run_number].
values()])
3284 if len(site_names) < 1:
3285 self.logger.debug(
" run is not available at any site")
3286 self.logger.debug(
" (but should contain %d events" % \
3287 num_events_catalog[run_number][
"all_sites"])
3289 self.logger.debug(
" at all sites combined there are %d events" % \
3290 num_events_catalog[run_number][
"all_sites"])
3291 for site_name
in site_names:
3292 num_events_catalog[run_number][site_name] = sum([i[0]
for i
in files_info[run_number].
values()
if site_name
in i[1]])
3293 self.logger.debug(
" at site `%s' there are %d events" % \
3294 (site_name, num_events_catalog[run_number][site_name]))
3295 num_events_catalog[run_number][
"mirrored"] = mirrored
3298 return num_events_catalog
3358 """Build a list of all datasets to be processed. 3367 if input_method
is None:
3369 elif input_method ==
"dataset":
3374 self.logger.info(
"Asking DBS for dataset names")
3375 dataset_names = self.dbs_resolve_dataset_name(input_name)
3376 elif input_method ==
"datasetfile":
3381 self.logger.info(
"Reading input from list file `%s'" % \
3384 listfile = open(
"/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/bin/%s" %input_name,
"r") 3385 print("open listfile")
3386 for dataset
in listfile:
3388 dataset_stripped = dataset.strip()
3389 if len(dataset_stripped) < 1:
3392 if dataset_stripped[0] !=
"#":
3393 dataset_names.extend(self. \
3397 msg =
"ERROR: Could not open input list file `%s'" % \
3399 self.logger.fatal(msg)
3404 assert False,
"Unknown input method `%s'" % input_method
3412 dataset_names = sorted(set(dataset_names))
3416 return dataset_names
3421 """Build a list of datasets to process. 3425 self.logger.info(
"Building list of datasets to consider...")
3427 input_method = self.input_method[
"datasets"][
"use"]
3428 input_name = self.input_name[
"datasets"][
"use"]
3429 dataset_names = self.build_dataset_list(input_method,
3432 [
None] * len(dataset_names))))
3434 self.logger.info(
" found %d dataset(s) to process:" % \
3436 for dataset
in dataset_names:
3437 self.logger.info(
" `%s'" % dataset)
3444 """Build a list of datasets to ignore. 3446 NOTE: We should always have a list of datasets to process, but 3447 it may be that we don't have a list of datasets to ignore. 3451 self.logger.info(
"Building list of datasets to ignore...")
3453 input_method = self.input_method[
"datasets"][
"ignore"]
3454 input_name = self.input_name[
"datasets"][
"ignore"]
3455 dataset_names = self.build_dataset_list(input_method,
3458 [
None] * len(dataset_names))))
3460 self.logger.info(
" found %d dataset(s) to ignore:" % \
3462 for dataset
in dataset_names:
3463 self.logger.info(
" `%s'" % dataset)
3475 if input_method
is None:
3477 elif input_method ==
"runs":
3480 self.logger.info(
"Reading list of runs from the " \
3482 runs.extend([
int(i.strip()) \
3483 for i
in input_name.split(
",") \
3484 if len(i.strip()) > 0])
3485 elif input_method ==
"runslistfile":
3487 self.logger.info(
"Reading list of runs from file `%s'" % \
3490 listfile = open(input_name,
"r") 3491 for run
in listfile:
3493 run_stripped = run.strip()
3494 if len(run_stripped) < 1:
3497 if run_stripped[0] !=
"#":
3498 runs.append(
int(run_stripped))
3501 msg =
"ERROR: Could not open input list file `%s'" % \
3503 self.logger.fatal(msg)
3509 assert False,
"Unknown input method `%s'" % input_method
3513 runs =
list(set(runs))
3521 """Build a list of runs to process. 3525 self.logger.info(
"Building list of runs to consider...")
3527 input_method = self.input_method[
"runs"][
"use"]
3528 input_name = self.input_name[
"runs"][
"use"]
3529 runs = self.build_runs_list(input_method, input_name)
3532 self.logger.info(
" found %d run(s) to process:" % \
3535 self.logger.info(
" %s" %
", ".
join([
str(i)
for i
in runs]))
3542 """Build a list of runs to ignore. 3544 NOTE: We should always have a list of runs to process, but 3545 it may be that we don't have a list of runs to ignore. 3549 self.logger.info(
"Building list of runs to ignore...")
3551 input_method = self.input_method[
"runs"][
"ignore"]
3552 input_name = self.input_name[
"runs"][
"ignore"]
3553 runs = self.build_runs_list(input_method, input_name)
3556 self.logger.info(
" found %d run(s) to ignore:" % \
3559 self.logger.info(
" %s" %
", ".
join([
str(i)
for i
in runs]))
3566 """Update the list of datasets taking into account the ones to 3569 Both lists have been generated before from DBS and both are 3570 assumed to be unique. 3572 NOTE: The advantage of creating the ignore list from DBS (in 3573 case a regexp is given) and matching that instead of directly 3574 matching the ignore criterion against the list of datasets (to 3575 consider) built from DBS is that in the former case we're sure 3576 that all regexps are treated exactly as DBS would have done 3577 without the cmsHarvester. 3579 NOTE: This only removes complete samples. Exclusion of single 3580 runs is done by the book keeping. So the assumption is that a 3581 user never wants to harvest just part (i.e. n out of N runs) 3586 self.logger.info(
"Processing list of datasets to ignore...")
3588 self.logger.debug(
"Before processing ignore list there are %d " \
3589 "datasets in the list to be processed" % \
3590 len(self.datasets_to_use))
3593 dataset_names_filtered = copy.deepcopy(self.datasets_to_use)
3594 for dataset_name
in self.datasets_to_use.keys():
3595 if dataset_name
in self.datasets_to_ignore.keys():
3596 del dataset_names_filtered[dataset_name]
3598 self.logger.info(
" --> Removed %d dataset(s)" % \
3599 (len(self.datasets_to_use) -
3600 len(dataset_names_filtered)))
3602 self.datasets_to_use = dataset_names_filtered
3604 self.logger.debug(
"After processing ignore list there are %d " \
3605 "datasets in the list to be processed" % \
3606 len(self.datasets_to_use))
3614 self.logger.info(
"Processing list of runs to use and ignore...")
3625 runs_to_use = self.runs_to_use
3626 runs_to_ignore = self.runs_to_ignore
3628 for dataset_name
in self.datasets_to_use:
3629 runs_in_dataset = self.datasets_information[dataset_name][
"runs"]
3632 runs_to_use_tmp = []
3633 for run
in runs_to_use:
3634 if not run
in runs_in_dataset:
3635 self.logger.warning(
"Dataset `%s' does not contain " \
3636 "requested run %d " \
3637 "--> ignoring `use' of this run" % \
3638 (dataset_name, run))
3640 runs_to_use_tmp.append(run)
3642 if len(runs_to_use) > 0:
3643 runs = runs_to_use_tmp
3644 self.logger.info(
"Using %d out of %d runs " \
3645 "of dataset `%s'" % \
3646 (len(runs), len(runs_in_dataset),
3649 runs = runs_in_dataset
3651 if len(runs_to_ignore) > 0:
3654 if not run
in runs_to_ignore:
3655 runs_tmp.append(run)
3656 self.logger.info(
"Ignoring %d out of %d runs " \
3657 "of dataset `%s'" % \
3658 (len(runs)- len(runs_tmp),
3659 len(runs_in_dataset),
3663 if self.todofile !=
"YourToDofile.txt":
3665 print(
"Reading runs from file /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s" %self.todofile)
3666 cmd=
"grep %s /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s | cut -f5 -d' '" %(dataset_name,self.todofile)
3667 (status, output)=commands.getstatusoutput(cmd)
3670 if run_str
in output:
3671 runs_todo.append(run)
3672 self.logger.info(
"Using %d runs " \
3673 "of dataset `%s'" % \
3679 if self.Jsonfilename !=
"YourJSON.txt":
3681 self.Jsonlumi =
True 3684 self.logger.info(
"Reading runs and lumisections from file `%s'" % \
3687 Jsonfile = open(self.Jsonfilename,
"r") 3688 for names
in Jsonfile:
3689 dictNames= eval(
str(names))
3690 for key
in dictNames:
3692 Json_runs.append(intkey)
3695 msg =
"ERROR: Could not open Jsonfile `%s'" % \
3697 self.logger.fatal(msg)
3700 if run
in Json_runs:
3701 good_runs.append(run)
3702 self.logger.info(
"Using %d runs " \
3703 "of dataset `%s'" % \
3707 if (self.Jsonrunfilename !=
"YourJSON.txt")
and (self.Jsonfilename ==
"YourJSON.txt"):
3711 self.logger.info(
"Reading runs from file `%s'" % \
3712 self.Jsonrunfilename)
3714 Jsonfile = open(self.Jsonrunfilename,
"r") 3715 for names
in Jsonfile:
3716 dictNames= eval(
str(names))
3717 for key
in dictNames:
3719 Json_runs.append(intkey)
3722 msg =
"ERROR: Could not open Jsonfile `%s'" % \
3724 self.logger.fatal(msg)
3727 if run
in Json_runs:
3728 good_runs.append(run)
3729 self.logger.info(
"Using %d runs " \
3730 "of dataset `%s'" % \
3735 self.datasets_to_use[dataset_name] = runs
3742 """Remove all but the largest part of all datasets. 3744 This allows us to harvest at least part of these datasets 3745 using single-step harvesting until the two-step approach 3751 assert self.harvesting_mode ==
"single-step-allow-partial" 3754 for dataset_name
in self.datasets_to_use:
3755 for run_number
in self.datasets_information[dataset_name][
"runs"]:
3756 max_events =
max(self.datasets_information[dataset_name][
"sites"][run_number].
values())
3757 sites_with_max_events = [i[0]
for i
in self.datasets_information[dataset_name][
"sites"][run_number].
items()
if i[1] == max_events]
3758 self.logger.warning(
"Singlifying dataset `%s', " \
3760 (dataset_name, run_number))
3761 cmssw_version = self.datasets_information[dataset_name] \
3763 selected_site = self.pick_a_site(sites_with_max_events,
3767 nevents_old = self.datasets_information[dataset_name][
"num_events"][run_number]
3768 self.logger.warning(
" --> " \
3769 "only harvesting partial statistics: " \
3770 "%d out of %d events (5.1%f%%) " \
3774 100. * max_events / nevents_old,
3776 self.logger.warning(
"!!! Please note that the number of " \
3777 "events in the output path name will " \
3778 "NOT reflect the actual statistics in " \
3779 "the harvested results !!!")
3786 self.datasets_information[dataset_name][
"sites"][run_number] = {selected_site: max_events}
3787 self.datasets_information[dataset_name][
"num_events"][run_number] = max_events
3795 """Check list of dataset names for impossible ones. 3797 Two kinds of checks are done: 3798 - Checks for things that do not make sense. These lead to 3799 errors and skipped datasets. 3800 - Sanity checks. For these warnings are issued but the user is 3801 considered to be the authoritative expert. 3804 - The CMSSW version encoded in the dataset name should match 3805 self.cmssw_version. This is critical. 3806 - There should be some events in the dataset/run. This is 3807 critical in the sense that CRAB refuses to create jobs for 3808 zero events. And yes, this does happen in practice. E.g. the 3809 reprocessed CRAFT08 datasets contain runs with zero events. 3810 - A cursory check is performed to see if the harvesting type 3811 makes sense for the data type. This should prevent the user 3812 from inadvertently running RelVal for data. 3813 - It is not possible to run single-step harvesting jobs on 3814 samples that are not fully contained at a single site. 3815 - Each dataset/run has to be available at at least one site. 3819 self.logger.info(
"Performing sanity checks on dataset list...")
3821 dataset_names_after_checks = copy.deepcopy(self.datasets_to_use)
3823 for dataset_name
in self.datasets_to_use.keys():
3826 version_from_dataset = self.datasets_information[dataset_name] \
3828 if version_from_dataset != self.cmssw_version:
3829 msg =
" CMSSW version mismatch for dataset `%s' " \
3832 self.cmssw_version, version_from_dataset)
3833 if self.force_running:
3835 self.logger.warning(
"%s " \
3836 "--> `force mode' active: " \
3839 del dataset_names_after_checks[dataset_name]
3840 self.logger.warning(
"%s " \
3841 "--> skipping" % msg)
3852 datatype = self.datasets_information[dataset_name][
"datatype"]
3853 if datatype ==
"data":
3855 if self.harvesting_type !=
"DQMOffline":
3857 elif datatype ==
"mc":
3858 if self.harvesting_type ==
"DQMOffline":
3862 assert False,
"ERROR Impossible data type `%s' " \
3863 "for dataset `%s'" % \
3864 (datatype, dataset_name)
3866 msg =
" Normally one does not run `%s' harvesting " \
3867 "on %s samples, are you sure?" % \
3868 (self.harvesting_type, datatype)
3869 if self.force_running:
3870 self.logger.warning(
"%s " \
3871 "--> `force mode' active: " \
3874 del dataset_names_after_checks[dataset_name]
3875 self.logger.warning(
"%s " \
3876 "--> skipping" % msg)
3890 if datatype ==
"data":
3891 if self.globaltag
is None:
3892 msg =
"For data datasets (like `%s') " \
3893 "we need a GlobalTag" % \
3895 del dataset_names_after_checks[dataset_name]
3896 self.logger.warning(
"%s " \
3897 "--> skipping" % msg)
3907 globaltag = self.datasets_information[dataset_name][
"globaltag"]
3908 if not globaltag
in self.globaltag_check_cache:
3909 if self.check_globaltag(globaltag):
3910 self.globaltag_check_cache.append(globaltag)
3912 msg =
"Something is wrong with GlobalTag `%s' " \
3913 "used by dataset `%s'!" % \
3914 (globaltag, dataset_name)
3915 if self.use_ref_hists:
3916 msg +=
"\n(Either it does not exist or it " \
3917 "does not contain the required key to " \
3918 "be used with reference histograms.)" 3920 msg +=
"\n(It probably just does not exist.)" 3921 self.logger.fatal(msg)
3927 runs_without_sites = [i
for (i, j)
in \
3928 self.datasets_information[dataset_name] \
3931 i
in self.datasets_to_use[dataset_name]]
3932 if len(runs_without_sites) > 0:
3933 for run_without_sites
in runs_without_sites:
3935 dataset_names_after_checks[dataset_name].
remove(run_without_sites)
3938 self.logger.warning(
" removed %d unavailable run(s) " \
3939 "from dataset `%s'" % \
3940 (len(runs_without_sites), dataset_name))
3941 self.logger.debug(
" (%s)" % \
3943 runs_without_sites]))
3949 if not self.harvesting_mode ==
"two-step":
3950 for run_number
in self.datasets_to_use[dataset_name]:
3955 num_sites = len(self.datasets_information[dataset_name] \
3956 [
"sites"][run_number])
3957 if num_sites > 1
and \
3958 not self.datasets_information[dataset_name] \
3959 [
"mirrored"][run_number]:
3963 msg =
" Dataset `%s', run %d is spread across more " \
3964 "than one site.\n" \
3965 " Cannot run single-step harvesting on " \
3966 "samples spread across multiple sites" % \
3967 (dataset_name, run_number)
3969 dataset_names_after_checks[dataset_name].
remove(run_number)
3972 self.logger.warning(
"%s " \
3973 "--> skipping" % msg)
3982 tmp = [j
for (i, j)
in self.datasets_information \
3983 [dataset_name][
"num_events"].
items() \
3984 if i
in self.datasets_to_use[dataset_name]]
3985 num_events_dataset = sum(tmp)
3987 if num_events_dataset < 1:
3988 msg =
" dataset `%s' is empty" % dataset_name
3989 del dataset_names_after_checks[dataset_name]
3990 self.logger.warning(
"%s " \
3991 "--> skipping" % msg)
4004 self.datasets_information[dataset_name] \
4005 [
"num_events"].
items()
if i[1] < 1]
4006 tmp = [i
for i
in tmp
if i[0]
in self.datasets_to_use[dataset_name]]
4007 empty_runs =
dict(tmp)
4008 if len(empty_runs) > 0:
4009 for empty_run
in empty_runs:
4011 dataset_names_after_checks[dataset_name].
remove(empty_run)
4014 self.logger.info(
" removed %d empty run(s) from dataset `%s'" % \
4015 (len(empty_runs), dataset_name))
4016 self.logger.debug(
" (%s)" % \
4017 ", ".
join([
str(i)
for i
in empty_runs]))
4023 dataset_names_after_checks_tmp = copy.deepcopy(dataset_names_after_checks)
4024 for (dataset_name, runs)
in six.iteritems(dataset_names_after_checks):
4026 self.logger.warning(
" Removing dataset without any runs " \
4029 del dataset_names_after_checks_tmp[dataset_name]
4030 dataset_names_after_checks = dataset_names_after_checks_tmp
4034 self.logger.warning(
" --> Removed %d dataset(s)" % \
4035 (len(self.datasets_to_use) -
4036 len(dataset_names_after_checks)))
4039 self.datasets_to_use = dataset_names_after_checks
4046 """Escape a DBS dataset name. 4048 Escape a DBS dataset name such that it does not cause trouble 4049 with the file system. This means turning each `/' into `__', 4050 except for the first one which is just removed. 4054 escaped_dataset_name = dataset_name
4055 escaped_dataset_name = escaped_dataset_name.strip(
"/")
4056 escaped_dataset_name = escaped_dataset_name.replace(
"/",
"__")
4058 return escaped_dataset_name
4065 """Generate the name of the configuration file to be run by 4068 Depending on the harvesting mode (single-step or two-step) 4069 this is the name of the real harvesting configuration or the 4070 name of the first-step ME summary extraction configuration. 4074 if self.harvesting_mode ==
"single-step":
4075 config_file_name = self.create_harvesting_config_file_name(dataset_name)
4076 elif self.harvesting_mode ==
"single-step-allow-partial":
4077 config_file_name = self.create_harvesting_config_file_name(dataset_name)
4084 elif self.harvesting_mode ==
"two-step":
4085 config_file_name = self.create_me_summary_config_file_name(dataset_name)
4087 assert False,
"ERROR Unknown harvesting mode `%s'" % \
4088 self.harvesting_mode
4091 return config_file_name
4097 "Generate the name to be used for the harvesting config file." 4099 file_name_base =
"harvesting.py" 4100 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4101 config_file_name = file_name_base.replace(
".py",
4103 dataset_name_escaped)
4106 return config_file_name
4111 "Generate the name of the ME summary extraction config file." 4113 file_name_base =
"me_extraction.py" 4114 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4115 config_file_name = file_name_base.replace(
".py",
4117 dataset_name_escaped)
4120 return config_file_name
4125 """Create the name of the output file name to be used. 4127 This is the name of the output file of the `first step'. In 4128 the case of single-step harvesting this is already the final 4129 harvesting output ROOT file. In the case of two-step 4130 harvesting it is the name of the intermediary ME summary 4143 if self.harvesting_mode ==
"single-step":
4145 assert not run_number
is None 4147 output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4148 elif self.harvesting_mode ==
"single-step-allow-partial":
4150 assert not run_number
is None 4152 output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4153 elif self.harvesting_mode ==
"two-step":
4155 assert run_number
is None 4157 output_file_name = self.create_me_summary_output_file_name(dataset_name)
4160 assert False,
"ERROR Unknown harvesting mode `%s'" % \
4161 self.harvesting_mode
4164 return output_file_name
4169 """Generate the name to be used for the harvesting output file. 4171 This harvesting output file is the _final_ ROOT output file 4172 containing the harvesting results. In case of two-step 4173 harvesting there is an intermediate ME output file as well. 4177 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4185 output_file_name =
"DQM_V0001_R%09d__%s.root" % \
4186 (run_number, dataset_name_escaped)
4187 if self.harvesting_mode.find(
"partial") > -1:
4190 if self.datasets_information[dataset_name] \
4191 [
"mirrored"][run_number] ==
False:
4192 output_file_name = output_file_name.replace(
".root", \
4196 return output_file_name
4201 """Generate the name of the intermediate ME file name to be 4202 used in two-step harvesting. 4206 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4207 output_file_name =
"me_summary_%s.root" % \
4208 dataset_name_escaped
4211 return output_file_name
4216 """Create the block name to use for this dataset/run number. 4218 This is what appears in the brackets `[]' in multicrab.cfg. It 4219 is used as the name of the job and to create output 4224 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4225 block_name =
"%s_%09d_%s" % (dataset_name_escaped, run_number, index)
4233 """Create a CRAB configuration for a given job. 4235 NOTE: This is _not_ a complete (as in: submittable) CRAB 4236 configuration. It is used to store the common settings for the 4237 multicrab configuration. 4239 NOTE: Only CERN CASTOR area (/castor/cern.ch/) is supported. 4241 NOTE: According to CRAB, you `Must define exactly two of 4242 total_number_of_events, events_per_job, or 4243 number_of_jobs.'. For single-step harvesting we force one job, 4244 for the rest we don't really care. 4247 # With the current version of CRAB (2.6.1), in which Daniele 4248 # fixed the behaviour of no_block_boundary for me, one _has to 4249 # specify_ the total_number_of_events and one single site in 4250 # the se_white_list. 4258 castor_prefix = self.castor_prefix
4260 tmp.append(self.config_file_header())
4265 tmp.append(
"[CRAB]")
4266 tmp.append(
"jobtype = cmssw")
4271 tmp.append(
"[GRID]")
4272 tmp.append(
"virtual_organization=cms")
4277 tmp.append(
"[USER]")
4278 tmp.append(
"copy_data = 1")
4283 tmp.append(
"[CMSSW]")
4284 tmp.append(
"# This reveals data hosted on T1 sites,")
4285 tmp.append(
"# which is normally hidden by CRAB.")
4286 tmp.append(
"show_prod = 1")
4287 tmp.append(
"number_of_jobs = 1")
4288 if self.Jsonlumi ==
True:
4289 tmp.append(
"lumi_mask = %s" % self.Jsonfilename)
4290 tmp.append(
"total_number_of_lumis = -1")
4292 if self.harvesting_type ==
"DQMOffline":
4293 tmp.append(
"total_number_of_lumis = -1")
4295 tmp.append(
"total_number_of_events = -1")
4296 if self.harvesting_mode.find(
"single-step") > -1:
4297 tmp.append(
"# Force everything to run in one job.")
4298 tmp.append(
"no_block_boundary = 1")
4305 crab_config =
"\n".
join(tmp)
4313 """Create a multicrab.cfg file for all samples. 4315 This creates the contents for a multicrab.cfg file that uses 4316 the crab.cfg file (generated elsewhere) for the basic settings 4317 and contains blocks for each run of each dataset. 4320 # The fact that it's necessary to specify the se_white_list 4321 # and the total_number_of_events is due to our use of CRAB 4322 # version 2.6.1. This should no longer be necessary in the 4328 cmd=
"who i am | cut -f1 -d' '" 4329 (status, output)=commands.getstatusoutput(cmd)
4332 if self.caf_access ==
True:
4333 print(
"Extracting %s as user name" %UserName)
4335 number_max_sites = self.nr_max_sites + 1
4337 multicrab_config_lines = []
4338 multicrab_config_lines.append(self.config_file_header())
4339 multicrab_config_lines.append(
"")
4340 multicrab_config_lines.append(
"[MULTICRAB]")
4341 multicrab_config_lines.append(
"cfg = crab.cfg")
4342 multicrab_config_lines.append(
"")
4344 dataset_names = sorted(self.datasets_to_use.keys())
4346 for dataset_name
in dataset_names:
4347 runs = self.datasets_to_use[dataset_name]
4348 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4349 castor_prefix = self.castor_prefix
4354 castor_dir = self.datasets_information[dataset_name] \
4355 [
"castor_path"][run]
4357 cmd =
"rfdir %s" % castor_dir
4358 (status, output) = commands.getstatusoutput(cmd)
4360 if len(output) <= 0:
4366 assert (len(self.datasets_information[dataset_name] \
4367 [
"sites"][run]) == 1)
or \
4368 self.datasets_information[dataset_name][
"mirrored"]
4371 site_names = self.datasets_information[dataset_name] \
4372 [
"sites"][run].
keys()
4374 for i
in range(1, number_max_sites, 1):
4375 if len(site_names) > 0:
4376 index =
"site_%02d" % (i)
4378 config_file_name = self. \
4380 output_file_name = self. \
4391 if len(site_names) > 1:
4392 cmssw_version = self.datasets_information[dataset_name] \
4394 self.logger.info(
"Picking site for mirrored dataset " \
4396 (dataset_name, run))
4397 site_name = self.pick_a_site(site_names, cmssw_version)
4398 if site_name
in site_names:
4399 site_names.remove(site_name)
4402 site_name = site_names[0]
4403 site_names.remove(site_name)
4405 if site_name
is self.no_matching_site_found_str:
4409 nevents = self.datasets_information[dataset_name][
"num_events"][run]
4412 multicrab_block_name = self.create_multicrab_block_name( \
4413 dataset_name, run, index)
4414 multicrab_config_lines.append(
"[%s]" % \
4415 multicrab_block_name)
4419 if site_name ==
"caf.cern.ch":
4420 multicrab_config_lines.append(
"CRAB.use_server=0")
4421 multicrab_config_lines.append(
"CRAB.scheduler=caf")
4423 multicrab_config_lines.append(
"scheduler = glite")
4427 if site_name ==
"caf.cern.ch":
4430 multicrab_config_lines.append(
"GRID.se_white_list = %s" % \
4432 multicrab_config_lines.append(
"# This removes the default blacklisting of T1 sites.")
4433 multicrab_config_lines.append(
"GRID.remove_default_blacklist = 1")
4434 multicrab_config_lines.append(
"GRID.rb = CERN")
4435 if not self.non_t1access:
4436 multicrab_config_lines.append(
"GRID.role = t1access")
4441 castor_dir = castor_dir.replace(castor_prefix,
"")
4442 multicrab_config_lines.append(
"USER.storage_element=srm-cms.cern.ch")
4443 multicrab_config_lines.append(
"USER.user_remote_dir = %s" % \
4445 multicrab_config_lines.append(
"USER.check_user_remote_dir=0")
4447 if site_name ==
"caf.cern.ch":
4448 multicrab_config_lines.append(
"USER.storage_path=%s" % castor_prefix)
4454 multicrab_config_lines.append(
"USER.storage_path=/srm/managerv2?SFN=%s" % castor_prefix)
4461 multicrab_config_lines.append(
"CMSSW.pset = %s" % \
4463 multicrab_config_lines.append(
"CMSSW.datasetpath = %s" % \
4465 multicrab_config_lines.append(
"CMSSW.runselection = %d" % \
4468 if self.Jsonlumi ==
True:
4471 if self.harvesting_type ==
"DQMOffline":
4474 multicrab_config_lines.append(
"CMSSW.total_number_of_events = %d" % \
4477 multicrab_config_lines.append(
"CMSSW.output_file = %s" % \
4482 if site_name ==
"caf.cern.ch":
4483 multicrab_config_lines.append(
"CAF.queue=cmscaf1nd")
4487 multicrab_config_lines.append(
"")
4491 self.all_sites_found =
True 4493 multicrab_config =
"\n".
join(multicrab_config_lines)
4496 return multicrab_config
4501 """Check if globaltag exists. 4503 Check if globaltag exists as GlobalTag in the database given 4504 by self.frontier_connection_name['globaltag']. If globaltag is 4505 None, self.globaltag is used instead. 4507 If we're going to use reference histograms this method also 4508 checks for the existence of the required key in the GlobalTag. 4512 if globaltag
is None:
4513 globaltag = self.globaltag
4516 if globaltag.endswith(
"::All"):
4517 globaltag = globaltag[:-5]
4519 connect_name = self.frontier_connection_name[
"globaltag"]
4527 connect_name = connect_name.replace(
"frontier://",
4528 "frontier://cmsfrontier:8000/")
4530 connect_name += self.db_account_name_cms_cond_globaltag()
4532 tag_exists = self.check_globaltag_exists(globaltag, connect_name)
4536 tag_contains_ref_hist_key =
False 4537 if self.use_ref_hists
and tag_exists:
4539 tag_contains_ref_hist_key = self.check_globaltag_contains_ref_hist_key(globaltag, connect_name)
4543 if self.use_ref_hists:
4544 ret_val = tag_exists
and tag_contains_ref_hist_key
4546 ret_val = tag_exists
4556 """Check if globaltag exists. 4560 self.logger.info(
"Checking existence of GlobalTag `%s'" % \
4562 self.logger.debug(
" (Using database connection `%s')" % \
4565 cmd =
"cmscond_tagtree_list -c %s -T %s" % \
4566 (connect_name, globaltag)
4567 (status, output) = commands.getstatusoutput(cmd)
4569 output.find(
"error") > -1:
4570 msg =
"Could not check existence of GlobalTag `%s' in `%s'" % \
4571 (globaltag, connect_name)
4572 if output.find(
".ALL_TABLES not found") > -1:
4574 "Missing database account `%s'" % \
4575 (msg, output.split(
".ALL_TABLES")[0].
split()[-1])
4576 self.logger.fatal(msg)
4577 self.logger.debug(
"Command used:")
4578 self.logger.debug(
" %s" % cmd)
4579 self.logger.debug(
"Output received:")
4580 self.logger.debug(output)
4582 if output.find(
"does not exist") > -1:
4583 self.logger.debug(
"GlobalTag `%s' does not exist in `%s':" % \
4584 (globaltag, connect_name))
4585 self.logger.debug(
"Output received:")
4586 self.logger.debug(output)
4590 self.logger.info(
" GlobalTag exists? -> %s" % tag_exists)
4598 """Check if globaltag contains the required RefHistos key. 4603 tag_contains_key =
None 4604 ref_hist_key =
"RefHistos" 4605 self.logger.info(
"Checking existence of reference " \
4606 "histogram key `%s' in GlobalTag `%s'" % \
4607 (ref_hist_key, globaltag))
4608 self.logger.debug(
" (Using database connection `%s')" % \
4610 cmd =
"cmscond_tagtree_list -c %s -T %s -n %s" % \
4611 (connect_name, globaltag, ref_hist_key)
4612 (status, output) = commands.getstatusoutput(cmd)
4614 output.find(
"error") > -1:
4615 msg =
"Could not check existence of key `%s'" % \
4616 (ref_hist_key, connect_name)
4617 self.logger.fatal(msg)
4618 self.logger.debug(
"Command used:")
4619 self.logger.debug(
" %s" % cmd)
4620 self.logger.debug(
"Output received:")
4621 self.logger.debug(
" %s" % output)
4624 self.logger.debug(
"Required key for use of reference " \
4625 "histograms `%s' does not exist " \
4626 "in GlobalTag `%s':" % \
4627 (ref_hist_key, globaltag))
4628 self.logger.debug(
"Output received:")
4629 self.logger.debug(output)
4630 tag_contains_key =
False 4632 tag_contains_key =
True 4634 self.logger.info(
" GlobalTag contains `%s' key? -> %s" % \
4635 (ref_hist_key, tag_contains_key))
4638 return tag_contains_key
4643 """Check the existence of tag_name in database connect_name. 4645 Check if tag_name exists as a reference histogram tag in the 4646 database given by self.frontier_connection_name['refhists']. 4650 connect_name = self.frontier_connection_name[
"refhists"]
4651 connect_name += self.db_account_name_cms_cond_dqm_summary()
4653 self.logger.debug(
"Checking existence of reference " \
4654 "histogram tag `%s'" % \
4656 self.logger.debug(
" (Using database connection `%s')" % \
4659 cmd =
"cmscond_list_iov -c %s" % \
4661 (status, output) = commands.getstatusoutput(cmd)
4663 msg =
"Could not check existence of tag `%s' in `%s'" % \
4664 (tag_name, connect_name)
4665 self.logger.fatal(msg)
4666 self.logger.debug(
"Command used:")
4667 self.logger.debug(
" %s" % cmd)
4668 self.logger.debug(
"Output received:")
4669 self.logger.debug(output)
4671 if not tag_name
in output.split():
4672 self.logger.debug(
"Reference histogram tag `%s' " \
4673 "does not exist in `%s'" % \
4674 (tag_name, connect_name))
4675 self.logger.debug(
" Existing tags: `%s'" % \
4676 "', `".
join(output.split()))
4680 self.logger.debug(
" Reference histogram tag exists? " \
4681 "-> %s" % tag_exists)
4689 """Build the es_prefer snippet for the reference histograms. 4691 The building of the snippet is wrapped in some care-taking 4692 code that figures out the name of the reference histogram set 4693 and makes sure the corresponding tag exists. 4699 ref_hist_tag_name = self.ref_hist_mappings[dataset_name]
4701 connect_name = self.frontier_connection_name[
"refhists"]
4702 connect_name += self.db_account_name_cms_cond_dqm_summary()
4703 record_name =
"DQMReferenceHistogramRootFileRcd" 4707 code_lines.append(
"from CondCore.DBCommon.CondDBSetup_cfi import *")
4708 code_lines.append(
"process.ref_hist_source = cms.ESSource(\"PoolDBESSource\", CondDBSetup,")
4709 code_lines.append(
" connect = cms.string(\"%s\")," % connect_name)
4710 code_lines.append(
" toGet = cms.VPSet(cms.PSet(record = cms.string(\"%s\")," % record_name)
4711 code_lines.append(
" tag = cms.string(\"%s\"))," % ref_hist_tag_name)
4712 code_lines.append(
" )")
4713 code_lines.append(
" )")
4714 code_lines.append(
"process.es_prefer_ref_hist_source = cms.ESPrefer(\"PoolDBESSource\", \"ref_hist_source\")")
4716 snippet =
"\n".
join(code_lines)
4724 """Create the Python harvesting configuration for harvesting. 4726 The basic configuration is created by 4727 Configuration.PyReleaseValidation.ConfigBuilder. (This mimics 4728 what cmsDriver.py does.) After that we add some specials 4731 NOTE: On one hand it may not be nice to circumvent 4732 cmsDriver.py, on the other hand cmsDriver.py does not really 4733 do anything itself. All the real work is done by the 4734 ConfigBuilder so there is not much risk that we miss out on 4735 essential developments of cmsDriver in the future. 4740 config_options = defaultOptions
4745 config_options.name =
"harvesting" 4746 config_options.scenario =
"pp" 4747 config_options.number = 1
4748 config_options.arguments = self.ident_string()
4749 config_options.evt_type = config_options.name
4750 config_options.customisation_file =
None 4751 config_options.filein =
"dummy_value" 4752 config_options.filetype =
"EDM" 4754 config_options.gflash =
"dummy_value" 4758 config_options.dbsquery =
"" 4765 config_options.step =
"HARVESTING:%s" % \
4766 self.harvesting_info[self.harvesting_type] \
4768 config_options.beamspot = self.harvesting_info[self.harvesting_type] \
4770 config_options.eventcontent = self.harvesting_info \
4771 [self.harvesting_type] \
4773 config_options.harvesting = self.harvesting_info \
4774 [self.harvesting_type] \
4781 datatype = self.datasets_information[dataset_name][
"datatype"]
4782 config_options.isMC = (datatype.lower() ==
"mc")
4783 config_options.isData = (datatype.lower() ==
"data")
4784 globaltag = self.datasets_information[dataset_name][
"globaltag"]
4786 config_options.conditions = self.format_conditions_string(globaltag)
4790 if "with_input" in getargspec(ConfigBuilder.__init__)[0]:
4792 config_builder =
ConfigBuilder(config_options, with_input=
True)
4796 config_builder.prepare(
True)
4797 config_contents = config_builder.pythonCfgCode
4806 marker_lines.append(sep)
4807 marker_lines.append(
"# Code between these markers was generated by")
4808 marker_lines.append(
"# Configuration.PyReleaseValidation." \
4811 marker_lines.append(sep)
4812 marker =
"\n".
join(marker_lines)
4814 tmp = [self.config_file_header()]
4818 tmp.append(config_contents)
4822 config_contents =
"\n".
join(tmp)
4827 customisations = [
""]
4829 customisations.append(
"# Now follow some customisations")
4830 customisations.append(
"")
4831 connect_name = self.frontier_connection_name[
"globaltag"]
4832 connect_name += self.db_account_name_cms_cond_globaltag()
4833 customisations.append(
"process.GlobalTag.connect = \"%s\"" % \
4837 if self.saveByLumiSection ==
True:
4838 customisations.append(
"process.dqmSaver.saveByLumiSection = 1")
4842 customisations.append(
"")
4856 use_es_prefer = (self.harvesting_type ==
"RelVal")
4857 use_refs = use_es_prefer
or \
4858 (
not self.harvesting_type ==
"MC")
4860 use_refs = use_refs
and self.use_ref_hists
4868 customisations.append(
"print \"Not using reference histograms\"")
4869 customisations.append(
"if hasattr(process, \"dqmRefHistoRootFileGetter\"):")
4870 customisations.append(
" for (sequence_name, sequence) in six.iteritems(process.sequences):")
4871 customisations.append(
" if sequence.remove(process.dqmRefHistoRootFileGetter):")
4872 customisations.append(
" print \"Removed process.dqmRefHistoRootFileGetter from sequence `%s'\" % \\")
4873 customisations.append(
" sequence_name")
4874 customisations.append(
"process.dqmSaver.referenceHandling = \"skip\"")
4878 customisations.append(
"process.dqmSaver.referenceHandling = \"all\"")
4880 es_prefer_snippet = self.create_es_prefer_snippet(dataset_name)
4881 customisations.append(es_prefer_snippet)
4885 workflow_name = dataset_name
4886 if self.harvesting_mode ==
"single-step-allow-partial":
4887 workflow_name +=
"_partial" 4888 customisations.append(
"process.dqmSaver.workflow = \"%s\"" % \
4925 config_contents = config_contents +
"\n".
join(customisations)
4930 return config_contents
4957 tmp.append(self.config_file_header())
4959 tmp.append(
"import FWCore.ParameterSet.Config as cms")
4961 tmp.append(
"process = cms.Process(\"ME2EDM\")")
4963 tmp.append(
"# Import of standard configurations")
4964 tmp.append(
"process.load(\"Configuration/EventContent/EventContent_cff\")")
4966 tmp.append(
"# We don't really process any events, just keep this set to one to")
4967 tmp.append(
"# make sure things work.")
4968 tmp.append(
"process.maxEvents = cms.untracked.PSet(")
4969 tmp.append(
" input = cms.untracked.int32(1)")
4972 tmp.append(
"process.options = cms.untracked.PSet(")
4973 tmp.append(
" Rethrow = cms.untracked.vstring(\"ProductNotFound\")")
4976 tmp.append(
"process.source = cms.Source(\"PoolSource\",")
4977 tmp.append(
" processingMode = \\")
4978 tmp.append(
" cms.untracked.string(\"RunsAndLumis\"),")
4979 tmp.append(
" fileNames = \\")
4980 tmp.append(
" cms.untracked.vstring(\"no_file_specified\")")
4983 tmp.append(
"# Output definition: drop everything except for the monitoring.")
4984 tmp.append(
"process.output = cms.OutputModule(")
4985 tmp.append(
" \"PoolOutputModule\",")
4986 tmp.append(
" outputCommands = \\")
4987 tmp.append(
" cms.untracked.vstring(\"drop *\", \\")
4988 tmp.append(
" \"keep *_MEtoEDMConverter_*_*\"),")
4989 output_file_name = self. \
4991 tmp.append(
" fileName = \\")
4992 tmp.append(
" cms.untracked.string(\"%s\")," % output_file_name)
4993 tmp.append(
" dataset = cms.untracked.PSet(")
4994 tmp.append(
" dataTier = cms.untracked.string(\"RECO\"),")
4995 tmp.append(
" filterName = cms.untracked.string(\"\")")
4999 tmp.append(
"# Additional output definition")
5000 tmp.append(
"process.out_step = cms.EndPath(process.output)")
5002 tmp.append(
"# Schedule definition")
5003 tmp.append(
"process.schedule = cms.Schedule(process.out_step)")
5006 config_contents =
"\n".
join(tmp)
5009 return config_contents
5047 """Write a CRAB job configuration Python file. 5051 self.logger.info(
"Writing CRAB configuration...")
5053 file_name_base =
"crab.cfg" 5056 crab_contents = self.create_crab_config()
5059 crab_file_name = file_name_base
5061 crab_file =
file(crab_file_name,
"w")
5062 crab_file.write(crab_contents)
5065 self.logger.fatal(
"Could not write " \
5066 "CRAB configuration to file `%s'" % \
5068 raise Error(
"ERROR: Could not write to file `%s'!" % \
5076 """Write a multi-CRAB job configuration Python file. 5080 self.logger.info(
"Writing multi-CRAB configuration...")
5082 file_name_base =
"multicrab.cfg" 5085 multicrab_contents = self.create_multicrab_config()
5088 multicrab_file_name = file_name_base
5090 multicrab_file =
file(multicrab_file_name,
"w")
5091 multicrab_file.write(multicrab_contents)
5092 multicrab_file.close()
5094 self.logger.fatal(
"Could not write " \
5095 "multi-CRAB configuration to file `%s'" % \
5096 multicrab_file_name)
5097 raise Error(
"ERROR: Could not write to file `%s'!" % \
5098 multicrab_file_name)
5105 """Write a harvesting job configuration Python file. 5107 NOTE: This knows nothing about single-step or two-step 5108 harvesting. That's all taken care of by 5109 create_harvesting_config. 5113 self.logger.debug(
"Writing harvesting configuration for `%s'..." % \
5117 config_contents = self.create_harvesting_config(dataset_name)
5120 config_file_name = self. \
5123 config_file =
file(config_file_name,
"w")
5124 config_file.write(config_contents)
5127 self.logger.fatal(
"Could not write " \
5128 "harvesting configuration to file `%s'" % \
5130 raise Error(
"ERROR: Could not write to file `%s'!" % \
5138 """Write an ME-extraction configuration Python file. 5140 This `ME-extraction' (ME = Monitoring Element) is the first 5141 step of the two-step harvesting. 5145 self.logger.debug(
"Writing ME-extraction configuration for `%s'..." % \
5149 config_contents = self.create_me_extraction_config(dataset_name)
5152 config_file_name = self. \
5155 config_file =
file(config_file_name,
"w")
5156 config_file.write(config_contents)
5159 self.logger.fatal(
"Could not write " \
5160 "ME-extraction configuration to file `%s'" % \
5162 raise Error(
"ERROR: Could not write to file `%s'!" % \
5171 """Check if we need to load and check the reference mappings. 5173 For data the reference histograms should be taken 5174 automatically from the GlobalTag, so we don't need any 5175 mappings. For RelVals we need to know a mapping to be used in 5176 the es_prefer code snippet (different references for each of 5179 WARNING: This implementation is a bit convoluted. 5185 if not dataset_name
is None:
5186 data_type = self.datasets_information[dataset_name] \
5188 mappings_needed = (data_type ==
"mc")
5190 if not mappings_needed:
5191 assert data_type ==
"data" 5194 tmp = [self.ref_hist_mappings_needed(dataset_name) \
5195 for dataset_name
in \
5196 self.datasets_information.keys()]
5197 mappings_needed = (
True in tmp)
5200 return mappings_needed
5205 """Load the reference histogram mappings from file. 5207 The dataset name to reference histogram name mappings are read 5208 from a text file specified in self.ref_hist_mappings_file_name. 5213 assert len(self.ref_hist_mappings) < 1, \
5214 "ERROR Should not be RE-loading " \
5215 "reference histogram mappings!" 5218 self.logger.info(
"Loading reference histogram mappings " \
5219 "from file `%s'" % \
5220 self.ref_hist_mappings_file_name)
5222 mappings_lines =
None 5224 mappings_file =
file(self.ref_hist_mappings_file_name,
"r") 5225 mappings_lines = mappings_file.readlines() 5226 mappings_file.close() 5228 msg =
"ERROR: Could not open reference histogram mapping "\
5229 "file `%s'" % self.ref_hist_mappings_file_name
5230 self.logger.fatal(msg)
5240 for mapping
in mappings_lines:
5242 if not mapping.startswith(
"#"):
5243 mapping = mapping.strip()
5244 if len(mapping) > 0:
5245 mapping_pieces = mapping.split()
5246 if len(mapping_pieces) != 2:
5247 msg =
"ERROR: The reference histogram mapping " \
5248 "file contains a line I don't " \
5249 "understand:\n %s" % mapping
5250 self.logger.fatal(msg)
5252 dataset_name = mapping_pieces[0].
strip()
5253 ref_hist_name = mapping_pieces[1].
strip()
5257 if dataset_name
in self.ref_hist_mappings:
5258 msg =
"ERROR: The reference histogram mapping " \
5259 "file contains multiple mappings for " \
5261 self.logger.fatal(msg)
5265 self.ref_hist_mappings[dataset_name] = ref_hist_name
5269 self.logger.info(
" Successfully loaded %d mapping(s)" % \
5270 len(self.ref_hist_mappings))
5271 max_len =
max([len(i)
for i
in self.ref_hist_mappings.keys()])
5272 for (map_from, map_to)
in six.iteritems(self.ref_hist_mappings):
5273 self.logger.info(
" %-*s -> %s" % \
5274 (max_len, map_from, map_to))
5281 """Make sure all necessary reference histograms exist. 5283 Check that for each of the datasets to be processed a 5284 reference histogram is specified and that that histogram 5285 exists in the database. 5287 NOTE: There's a little complication here. Since this whole 5288 thing was designed to allow (in principle) harvesting of both 5289 data and MC datasets in one go, we need to be careful to check 5290 the availability fof reference mappings only for those 5291 datasets that need it. 5295 self.logger.info(
"Checking reference histogram mappings")
5297 for dataset_name
in self.datasets_to_use:
5299 ref_hist_name = self.ref_hist_mappings[dataset_name]
5301 msg =
"ERROR: No reference histogram mapping found " \
5302 "for dataset `%s'" % \
5304 self.logger.fatal(msg)
5307 if not self.check_ref_hist_tag(ref_hist_name):
5308 msg =
"Reference histogram tag `%s' " \
5309 "(used for dataset `%s') does not exist!" % \
5310 (ref_hist_name, dataset_name)
5311 self.logger.fatal(msg)
5314 self.logger.info(
" Done checking reference histogram mappings.")
5321 """Obtain all information on the datasets that we need to run. 5323 Use DBS to figure out all required information on our 5324 datasets, like the run numbers and the GlobalTag. All 5325 information is stored in the datasets_information member 5340 self.datasets_information = {}
5341 self.logger.info(
"Collecting information for all datasets to process")
5342 dataset_names = sorted(self.datasets_to_use.keys())
5343 for dataset_name
in dataset_names:
5347 self.logger.info(sep_line)
5348 self.logger.info(
" `%s'" % dataset_name)
5349 self.logger.info(sep_line)
5351 runs = self.dbs_resolve_runs(dataset_name)
5352 self.logger.info(
" found %d run(s)" % len(runs))
5354 self.logger.debug(
" run number(s): %s" % \
5355 ", ".
join([
str(i)
for i
in runs]))
5359 self.logger.warning(
" --> skipping dataset " 5361 assert False,
"Panic: found a dataset without runs " \
5365 cmssw_version = self.dbs_resolve_cmssw_version(dataset_name)
5366 self.logger.info(
" found CMSSW version `%s'" % cmssw_version)
5369 datatype = self.dbs_resolve_datatype(dataset_name)
5370 self.logger.info(
" sample is data or MC? --> %s" % \
5376 if self.globaltag
is None:
5377 globaltag = self.dbs_resolve_globaltag(dataset_name)
5379 globaltag = self.globaltag
5381 self.logger.info(
" found GlobalTag `%s'" % globaltag)
5387 assert datatype ==
"data", \
5388 "ERROR Empty GlobalTag for MC dataset!!!" 5396 sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5400 for run_number
in sites_catalog.keys():
5401 num_events[run_number] = sites_catalog \
5402 [run_number][
"all_sites"]
5403 del sites_catalog[run_number][
"all_sites"]
5408 for run_number
in sites_catalog.keys():
5409 mirror_catalog[run_number] = sites_catalog \
5410 [run_number][
"mirrored"]
5411 del sites_catalog[run_number][
"mirrored"]
5440 self.datasets_information[dataset_name] = {}
5441 self.datasets_information[dataset_name][
"runs"] = runs
5442 self.datasets_information[dataset_name][
"cmssw_version"] = \
5444 self.datasets_information[dataset_name][
"globaltag"] = globaltag
5445 self.datasets_information[dataset_name][
"datatype"] = datatype
5446 self.datasets_information[dataset_name][
"num_events"] = num_events
5447 self.datasets_information[dataset_name][
"mirrored"] = mirror_catalog
5448 self.datasets_information[dataset_name][
"sites"] = sites_catalog
5452 castor_path_common = self.create_castor_path_name_common(dataset_name)
5453 self.logger.info(
" output will go into `%s'" % \
5457 [self.create_castor_path_name_special(dataset_name, i, castor_path_common) \
5459 for path_name
in castor_paths.values():
5460 self.logger.debug(
" %s" % path_name)
5461 self.datasets_information[dataset_name][
"castor_path"] = \
5469 """Tell the user what to do now, after this part is done. 5471 This should provide the user with some (preferably 5472 copy-pasteable) instructions on what to do now with the setups 5473 and files that have been created. 5483 self.logger.info(
"")
5484 self.logger.info(sep_line)
5485 self.logger.info(
" Configuration files have been created.")
5486 self.logger.info(
" From here on please follow the usual CRAB instructions.")
5487 self.logger.info(
" Quick copy-paste instructions are shown below.")
5488 self.logger.info(sep_line)
5490 self.logger.info(
"")
5491 self.logger.info(
" Create all CRAB jobs:")
5492 self.logger.info(
" multicrab -create")
5493 self.logger.info(
"")
5494 self.logger.info(
" Submit all CRAB jobs:")
5495 self.logger.info(
" multicrab -submit")
5496 self.logger.info(
"")
5497 self.logger.info(
" Check CRAB status:")
5498 self.logger.info(
" multicrab -status")
5499 self.logger.info(
"")
5501 self.logger.info(
"")
5502 self.logger.info(
" For more information please see the CMS Twiki:")
5503 self.logger.info(
" %s" % twiki_url)
5504 self.logger.info(sep_line)
5508 if not self.all_sites_found:
5509 self.logger.warning(
" For some of the jobs no matching " \
5510 "site could be found")
5511 self.logger.warning(
" --> please scan your multicrab.cfg" \
5512 "for occurrences of `%s'." % \
5513 self.no_matching_site_found_str)
5514 self.logger.warning(
" You will have to fix those " \
5522 "Main entry point of the CMS harvester." 5532 self.parse_cmd_line_options()
5534 self.check_input_status()
5547 self.setup_harvesting_info()
5550 self.build_dataset_use_list()
5552 self.build_dataset_ignore_list()
5555 self.build_runs_use_list()
5556 self.build_runs_ignore_list()
5563 self.process_dataset_ignore_list()
5567 self.build_datasets_information()
5569 if self.use_ref_hists
and \
5570 self.ref_hist_mappings_needed():
5573 self.load_ref_hist_mappings()
5577 self.check_ref_hist_mappings()
5579 self.logger.info(
"No need to load reference " \
5580 "histogram mappings file")
5595 self.process_runs_use_and_ignore_lists()
5600 if self.harvesting_mode ==
"single-step-allow-partial":
5601 self.singlify_datasets()
5604 self.check_dataset_list()
5606 if len(self.datasets_to_use) < 1:
5607 self.logger.info(
"After all checks etc. " \
5608 "there are no datasets (left?) " \
5612 self.logger.info(
"After all checks etc. we are left " \
5613 "with %d dataset(s) to process " \
5614 "for a total of %d runs" % \
5615 (len(self.datasets_to_use),
5616 sum([len(i)
for i
in \
5617 self.datasets_to_use.values()])))
5644 self.create_and_check_castor_dirs()
5648 self.write_crab_config()
5649 self.write_multicrab_config()
5659 for dataset_name
in self.datasets_to_use.keys():
5661 self.write_harvesting_config(dataset_name)
5662 if self.harvesting_mode ==
"two-step":
5663 self.write_me_extraction_config(dataset_name)
5671 for run_number
in self.datasets_to_use[dataset_name]:
5672 tmp[run_number] = self.datasets_information \
5673 [dataset_name][
"num_events"][run_number]
5674 if dataset_name
in self.book_keeping_information:
5675 self.book_keeping_information[dataset_name].
update(tmp)
5677 self.book_keeping_information[dataset_name] = tmp
5680 self.show_exit_message()
5682 except Usage
as err:
5687 except Error
as err:
5691 except Exception
as err:
5700 if isinstance(err, SystemExit):
5701 self.logger.fatal(err.code)
5702 elif not isinstance(err, KeyboardInterrupt):
5703 self.logger.fatal(
"!" * 50)
5704 self.logger.fatal(
" This looks like a serious problem.")
5705 self.logger.fatal(
" If you are sure you followed all " \
5707 self.logger.fatal(
" please copy the below stack trace together")
5708 self.logger.fatal(
" with a description of what you were doing to")
5709 self.logger.fatal(
" jeroen.hegeman@cern.ch.")
5710 self.logger.fatal(
" %s" % self.ident_string())
5711 self.logger.fatal(
"!" * 50)
5712 self.logger.fatal(
str(err))
5714 traceback_string = traceback.format_exc()
5715 for line
in traceback_string.split(
"\n"):
5716 self.logger.fatal(line)
5717 self.logger.fatal(
"!" * 50)
5732 if self.crab_submission ==
True:
5733 os.system(
"multicrab -create")
5734 os.system(
"multicrab -submit")
5745 if __name__ ==
"__main__":
5746 "Main entry point for harvesting." def create_crab_config(self)
def option_handler_harvesting_mode(self, option, opt_str, value, parser)
def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser)
def write_harvesting_config(self, dataset_name)
no_matching_site_found_str
def option_handler_saveByLumiSection(self, option, opt_str, value, parser)
def option_handler_sites(self, option, opt_str, value, parser)
def write_me_extraction_config(self, dataset_name)
def process_runs_use_and_ignore_lists(self)
def build_dataset_ignore_list(self)
def escape_dataset_name(self, dataset_name)
if self.datasets_information[dataset_name]["num_events"][run_number] != 0: pdb.set_trace() DEBUG DEBU...
def startElement(self, name, attrs)
def singlify_datasets(self)
def option_handler_castor_dir(self, option, opt_str, value, parser)
def option_handler_dataset_name(self, option, opt_str, value, parser): """Specify the name(s) of the ...
def option_handler_force(self, option, opt_str, value, parser)
def option_handler_caf_access(self, option, opt_str, value, parser)
def option_handler_frontier_connection(self, option, opt_str, value, parser)
def option_handler_crab_submission(self, option, opt_str, value, parser)
def create_harvesting_config(self, dataset_name)
def create_castor_path_name_special(self, dataset_name, run_number, castor_path_common)
def option_handler_harvesting_type(self, option, opt_str, value, parser)
def write_crab_config(self)
def create_harvesting_config(self, dataset_name): """Create the Python harvesting configuration for a...
def replace(string, replacements)
def write_multicrab_config(self)
def build_runs_ignore_list(self)
def current_element(self)
S & print(S &os, JobReport::InputFile const &f)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
def option_handler_preferred_site(self, option, opt_str, value, parser)
def option_handler_quiet(self, option, opt_str, value, parser)
frontier_connection_overridden
def create_castor_path_name_common(self, dataset_name)
def show_exit_message(self)
DEBUG DEBUG DEBUGThis is probably only useful to make sure we don't muckthings up, right?Figure out across how many sites this sample has been spread.
Helper class: Error exception.
def check_globaltag(self, globaltag=None)
def ref_hist_mappings_needed(self, dataset_name=None)
def dbs_resolve_runs(self, dataset_name)
def dbs_resolve_dataset_number_of_events(self, dataset_name): """Ask DBS across how many events this ...
Helper class: Usage exception.
def option_handler_no_ref_hists(self, option, opt_str, value, parser)
def create_me_extraction_config(self, dataset_name)
In case this file is the second step (the real harvestingstep) of the two-step harvesting we have to ...
def create_me_summary_config_file_name(self, dataset_name)
def config_file_header(self)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def build_datasets_information(self)
def check_globaltag_exists(self, globaltag, connect_name)
def check_ref_hist_mappings(self)
def check_ref_hist_tag(self, tag_name)
book_keeping_file_name_default
def dbs_resolve_datatype(self, dataset_name)
def create_multicrab_block_name(self, dataset_name, run_number, index)
def __init__(self, cmd_line_opts=None)
def __init__(self, tag_names)
def setup_harvesting_info(self)
Helper class: DBSXMLHandler.
def option_handler_list_types(self, option, opt_str, value, parser)
def create_output_file_name(self, dataset_name, run_number=None)
def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser)
def option_handler_book_keeping_file(self, option, opt_str, value, parser)
def pick_a_site(self, sites, cmssw_version)
self.logger.debug("Checking CASTOR path piece `%s'" % \ piece)
def create_and_check_castor_dir(self, castor_dir)
def dbs_resolve_cmssw_version(self, dataset_name)
def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name)
def dbs_resolve_number_of_events(self, dataset_name, run_number=None)
def set_output_level(self, output_level)
static std::string join(char **cmd)
def load_ref_hist_mappings(self)
def parse_cmd_line_options(self)
def option_handler_input_Jsonfile(self, option, opt_str, value, parser)
def option_handler_input_todofile(self, option, opt_str, value, parser)
def check_results_validity(self)
def option_handler_input_spec(self, option, opt_str, value, parser)
def create_es_prefer_snippet(self, dataset_name)
def option_handler_debug(self, option, opt_str, value, parser)
def remove(d, key, TELL=False)
def dbs_resolve_globaltag(self, dataset_name)
def format_conditions_string(self, globaltag)
def build_runs_list(self, input_method, input_name)
def check_dataset_list(self)
def create_harvesting_config_file_name(self, dataset_name)
Only add the alarming piece to the file name if this isa spread-out dataset.
def dbs_check_dataset_spread(self, dataset_name)
def dbs_resolve_dataset_number_of_sites(self, dataset_name): """Ask DBS across how many sites this da...
def build_dataset_use_list(self)
def create_me_summary_output_file_name(self, dataset_name)
def characters(self, content)
def db_account_name_cms_cond_globaltag(self)
def option_handler_no_t1access(self, option, opt_str, value, parser)
def option_handler_globaltag(self, option, opt_str, value, parser)
def db_account_name_cms_cond_dqm_summary(self)
def check_input_status(self)
def dbs_resolve_dataset_name(self, dataset_name)
def create_config_file_name(self, dataset_name, run_number)
def setup_dbs(self)
Now we try to do a very simple DBS search.
def build_runs_use_list(self)
def create_multicrab_config(self)
def create_and_check_castor_dirs(self)
ref_hist_mappings_file_name_default
def create_harvesting_output_file_name(self, dataset_name, run_number)
def endElement(self, name)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
def process_dataset_ignore_list(self)
ref_hist_mappings_file_name
def build_dataset_list(self, input_method, input_name)
class Handler(xml.sax.handler.ContentHandler): def startElement(self, name, attrs): if name == "resul...