15 """Main program to run all kinds of harvesting. 17 These are the basic kinds of harvesting implemented (contact me if 18 your favourite is missing): 20 - RelVal : Run for release validation samples. Makes heavy use of MC 23 - RelValFS: FastSim RelVal. 25 - MC : Run for MC samples. 27 - DQMOffline : Run for real data (could also be run for MC). 29 For the mappings of these harvesting types to sequence names please 30 see the setup_harvesting_info() and option_handler_list_types() 37 __version__ =
"3.8.2p1" 38 __author__ =
"Jeroen Hegeman (jeroen.hegeman@cern.ch)," \
39 "Niklas Pietsch (niklas.pietsch@desy.de)" 41 twiki_url =
"https://twiki.cern.ch/twiki/bin/view/CMS/CmsHarvester" 97 from inspect
import getargspec
98 from random
import choice
103 from DBSAPI.dbsApi
import DbsApi
106 from functools
import reduce
109 global SAXParseException
111 from xml.sax
import SAXParseException
113 import Configuration.PyReleaseValidation
114 from Configuration.PyReleaseValidation.ConfigBuilder
import \
115 ConfigBuilder, defaultOptions
134 return repr(self.
msg)
145 return repr(self.
msg)
152 """Helper class to add some customised help output to cmsHarvester. 154 We want to add some instructions, as well as a pointer to the CMS 164 usage_lines.append(sep_line)
165 usage_lines.append(
"Welcome to the CMS harvester, a (hopefully useful)")
166 usage_lines.append(
"tool to create harvesting configurations.")
167 usage_lines.append(
"For more information please have a look at the CMS Twiki:")
168 usage_lines.append(
" %s" % twiki_url)
169 usage_lines.append(sep_line)
170 usage_lines.append(
"")
174 usage_lines.append(optparse.IndentedHelpFormatter. \
177 formatted_usage =
"\n".
join(usage_lines)
178 return formatted_usage
187 """XML handler class to parse DBS results. 189 The tricky thing here is that older DBS versions (2.0.5 and 190 earlier) return results in a different XML format than newer 191 versions. Previously the result values were returned as attributes 192 to the `result' element. The new approach returns result values as 193 contents of named elements. 195 The old approach is handled directly in startElement(), the new 196 approach in characters(). 198 NOTE: All results are returned in the form of string values of 209 "dataset.tag" :
"PROCESSEDDATASET_GLOBALTAG",
210 "datatype.type" :
"PRIMARYDSTYPE_TYPE",
211 "run" :
"RUNS_RUNNUMBER",
212 "run.number" :
"RUNS_RUNNUMBER",
213 "file.name" :
"FILES_LOGICALFILENAME",
214 "file.numevents" :
"FILES_NUMBEROFEVENTS",
215 "algo.version" :
"APPVERSION_VERSION",
216 "site" :
"STORAGEELEMENT_SENAME",
227 self.element_position.append(name)
236 key = DBSXMLHandler.mapping[name]
237 value =
str(attrs[key])
247 "closing unopenend element `%s'" % name
256 self.element_position.pop()
264 self.current_value.append(content)
270 """Make sure that all results arrays have equal length. 272 We should have received complete rows from DBS. I.e. all 273 results arrays in the handler should be of equal length. 279 res_names = self.results.keys()
280 if len(res_names) > 1:
281 for res_name
in res_names[1:]:
282 res_tmp = self.
results[res_name]
283 if len(res_tmp) != len(self.
results[res_names[0]]):
284 results_valid =
False 295 """Class to perform CMS harvesting. 297 More documentation `obviously' to follow. 304 "Initialize class and process command line options." 331 "single-step-allow-partial",
361 for key
in self.frontier_connection_name.keys():
410 self.Jsonlumi =
False 421 self.castor_base_dir =
None 422 self.castor_base_dir_default =
"/castor/cern.ch/" \
424 "dqm/offline/harvesting_output/" 429 self.book_keeping_file_name_default =
"harvesting_accounting.txt" 436 self.ref_hist_mappings_file_name_default =
"harvesting_ref_hist_mappings.txt" 441 self.castor_prefix =
"/castor/cern.ch" 447 self.non_t1access =
False 448 self.caf_access =
False 449 self.saveByLumiSection =
False 450 self.crab_submission =
False 451 self.nr_max_sites = 1
453 self.preferred_site =
"no preference" 456 self.datasets_to_use = {}
458 self.datasets_to_ignore = {}
460 self.book_keeping_information = {}
463 self.ref_hist_mappings = {}
467 self.runs_to_use = {}
468 self.runs_to_ignore = {}
471 self.sites_and_versions_cache = {}
474 self.globaltag_check_cache = []
478 self.all_sites_found =
True 481 self.no_matching_site_found_str =
"no_matching_site_found" 484 if cmd_line_opts
is None:
485 cmd_line_opts = sys.argv[1:]
486 self.cmd_line_opts = cmd_line_opts
489 log_handler = logging.StreamHandler()
492 log_formatter = logging.Formatter(
"%(message)s")
493 log_handler.setFormatter(log_formatter)
494 logger = logging.getLogger()
496 logger.addHandler(log_handler)
508 "Clean up after ourselves." 520 "Create a timestamp to use in the created config files." 522 time_now = datetime.datetime.utcnow()
524 time_now = time_now.replace(microsecond = 0)
525 time_stamp =
"%sUTC" % datetime.datetime.isoformat(time_now)
533 "Spit out an identification string for cmsHarvester.py." 535 ident_str =
"`cmsHarvester.py " \
536 "version %s': cmsHarvester.py %s" % \
538 reduce(
lambda x, y: x+
' '+y, sys.argv[1:]))
545 """Create the conditions string needed for `cmsDriver'. 547 Just glueing the FrontierConditions bit in front of it really. 558 if globaltag.lower().
find(
"conditions") > -1:
559 conditions_string = globaltag
561 conditions_string =
"FrontierConditions_GlobalTag,%s" % \
565 return conditions_string
570 """Return the database account name used to store the GlobalTag. 572 The name of the database account depends (albeit weakly) on 573 the CMSSW release version. 579 account_name =
"CMS_COND_31X_GLOBALTAG" 587 """See db_account_name_cms_cond_globaltag.""" 590 version = self.cmssw_version[6:11]
591 if version <
"3_4_0":
592 account_name =
"CMS_COND_31X_DQM_SUMMARY" 594 account_name =
"CMS_COND_34X" 602 "Create a nice header to be used to mark the generated files." 608 tmp.append(
"# %s" % time_stamp)
609 tmp.append(
"# WARNING: This file was created automatically!")
611 tmp.append(
"# Created by %s" % ident_str)
613 header =
"\n".
join(tmp)
621 """Adjust the level of output generated. 624 - normal : default level of output 625 - quiet : less output than the default 626 - verbose : some additional information 627 - debug : lots more information, may be overwhelming 629 NOTE: The debug option is a bit special in the sense that it 630 also modifies the output format. 637 "NORMAL" : logging.INFO,
638 "QUIET" : logging.WARNING,
639 "VERBOSE" : logging.INFO,
640 "DEBUG" : logging.DEBUG
643 output_level = output_level.upper()
651 self.logger.fatal(
"Unknown output level `%s'" % ouput_level)
660 """Switch to debug mode. 662 This both increases the amount of output generated, as well as 663 changes the format used (more detailed information is given). 668 log_formatter_debug = logging.Formatter(
"[%(levelname)s] " \
678 log_handler = self.logger.handlers[0]
679 log_handler.setFormatter(log_formatter_debug)
687 "Switch to quiet mode: less verbose." 696 """Switch on `force mode' in which case we don't brake for nobody. 698 In so-called `force mode' all sanity checks are performed but 699 we don't halt on failure. Of course this requires some care 704 self.logger.debug(
"Switching on `force mode'.")
712 """Set the harvesting type to be used. 714 This checks that no harvesting type is already set, and sets 715 the harvesting type to be used to the one specified. If a 716 harvesting type is already set an exception is thrown. The 717 same happens when an unknown type is specified. 726 value = value.lower()
729 type_index = harvesting_types_lowered.index(value)
733 self.logger.fatal(
"Unknown harvesting type `%s'" % \
735 self.logger.fatal(
" possible types are: %s" %
737 raise Usage(
"Unknown harvesting type `%s'" % \
743 msg =
"Only one harvesting type should be specified" 744 self.logger.fatal(msg)
748 self.logger.info(
"Harvesting type to be used: `%s'" % \
756 """Set the harvesting mode to be used. 758 Single-step harvesting can be used for samples that are 759 located completely at a single site (= SE). Otherwise use 765 harvesting_mode = value.lower()
767 msg =
"Unknown harvesting mode `%s'" % harvesting_mode
768 self.logger.fatal(msg)
769 self.logger.fatal(
" possible modes are: %s" % \
776 msg =
"Only one harvesting mode should be specified" 777 self.logger.fatal(msg)
781 self.logger.info(
"Harvesting mode to be used: `%s'" % \
789 """Set the GlobalTag to be used, overriding our own choices. 791 By default the cmsHarvester will use the GlobalTag with which 792 a given dataset was created also for the harvesting. The 793 --globaltag option is the way to override this behaviour. 799 msg =
"Only one GlobalTag should be specified" 800 self.logger.fatal(msg)
804 self.logger.info(
"GlobalTag to be used: `%s'" % \
812 "Switch use of all reference histograms off." 816 self.logger.warning(
"Switching off all use of reference histograms")
824 """Override the default Frontier connection string. 826 Please only use this for testing (e.g. when a test payload has 827 been inserted into cms_orc_off instead of cms_orc_on). 829 This method gets called for three different command line 831 - --frontier-connection, 832 - --frontier-connection-for-globaltag, 833 - --frontier-connection-for-refhists. 834 Appropriate care has to be taken to make sure things are only 840 frontier_type = opt_str.split(
"-")[-1]
841 if frontier_type ==
"connection":
843 frontier_types = self.frontier_connection_name.keys()
845 frontier_types = [frontier_type]
849 for connection_name
in frontier_types:
851 msg =
"Please specify either:\n" \
852 " `--frontier-connection' to change the " \
853 "Frontier connection used for everything, or\n" \
854 "either one or both of\n" \
855 " `--frontier-connection-for-globaltag' to " \
856 "change the Frontier connection used for the " \
858 " `--frontier-connection-for-refhists' to change " \
859 "the Frontier connection used for the " \
860 "reference histograms." 861 self.logger.fatal(msg)
864 frontier_prefix =
"frontier://" 865 if not value.startswith(frontier_prefix):
866 msg =
"Expecting Frontier connections to start with " \
867 "`%s'. You specified `%s'." % \
868 (frontier_prefix, value)
869 self.logger.fatal(msg)
873 if value.find(
"FrontierProd") < 0
and \
874 value.find(
"FrontierProd") < 0:
875 msg =
"Expecting Frontier connections to contain either " \
876 "`FrontierProd' or `FrontierPrep'. You specified " \
877 "`%s'. Are you sure?" % \
879 self.logger.warning(msg)
881 if not value.endswith(
"/"):
884 for connection_name
in frontier_types:
888 frontier_type_str =
"unknown" 889 if connection_name ==
"globaltag":
890 frontier_type_str =
"the GlobalTag" 891 elif connection_name ==
"refhists":
892 frontier_type_str =
"the reference histograms" 894 self.logger.warning(
"Overriding default Frontier " \
895 "connection for %s " \
933 if opt_str.lower().
find(
"ignore") > -1:
939 if opt_str.lower().
find(
"dataset") > -1:
940 select_type =
"datasets" 944 if not self.
input_method[select_type][spec_type]
is None:
945 msg =
"Please only specify one input method " \
946 "(for the `%s' case)" % opt_str
947 self.logger.fatal(msg)
950 input_method = opt_str.replace(
"-",
"").
replace(
"ignore",
"")
951 self.
input_method[select_type][spec_type] = input_method
952 self.
input_name[select_type][spec_type] = value
954 self.logger.debug(
"Input method for the `%s' case: %s" % \
955 (spec_type, input_method))
962 """Store the name of the file to be used for book keeping. 964 The only check done here is that only a single book keeping 972 msg =
"Only one book keeping file should be specified" 973 self.logger.fatal(msg)
977 self.logger.info(
"Book keeping file to be used: `%s'" % \
985 """Store the name of the file for the ref. histogram mapping. 992 msg =
"Only one reference histogram mapping file " \
993 "should be specified" 994 self.logger.fatal(msg)
998 self.logger.info(
"Reference histogram mapping file " \
999 "to be used: `%s'" % \
1061 """Specify where on CASTOR the output should go. 1063 At the moment only output to CERN CASTOR is 1064 supported. Eventually the harvested results should go into the 1065 central place for DQM on CASTOR anyway. 1072 castor_prefix = self.castor_prefix
1075 castor_dir = os.path.join(os.path.sep, castor_dir)
1076 self.castor_base_dir = os.path.normpath(castor_dir)
1078 self.logger.info(
"CASTOR (base) area to be used: `%s'" % \
1079 self.castor_base_dir)
1086 """Set the self.no_t1access flag to try and create jobs that 1087 run without special `t1access' role. 1091 self.non_t1access =
True 1093 self.logger.warning(
"Running in `non-t1access' mode. " \
1094 "Will try to create jobs that run " \
1095 "without special rights but no " \
1096 "further promises...")
1103 """Set the self.caf_access flag to try and create jobs that 1107 self.caf_access =
True 1109 self.logger.warning(
"Running in `caf_access' mode. " \
1110 "Will try to create jobs that run " \
1112 "further promises...")
1119 """Set process.dqmSaver.saveByLumiSectiont=1 in cfg harvesting file 1121 self.saveByLumiSection =
True 1123 self.logger.warning(
"waning concerning saveByLumiSection option")
1131 """Crab jobs are not created and 1132 "submitted automatically", 1134 self.crab_submission =
True 1142 self.nr_max_sites = value
1148 self.preferred_site = value
1153 """List all harvesting types and their mappings. 1155 This lists all implemented harvesting types with their 1156 corresponding mappings to sequence names. This had to be 1157 separated out from the help since it depends on the CMSSW 1158 version and was making things a bit of a mess. 1160 NOTE: There is no way (at least not that I could come up with) 1161 to code this in a neat generic way that can be read both by 1162 this method and by setup_harvesting_info(). Please try hard to 1163 keep these two methods in sync! 1168 sep_line_short =
"-" * 20
1171 print "The following harvesting types are available:" 1174 print "`RelVal' maps to:" 1175 print " pre-3_3_0 : HARVESTING:validationHarvesting" 1176 print " 3_4_0_pre2 and later: HARVESTING:validationHarvesting+dqmHarvesting" 1177 print " Exceptions:" 1178 print " 3_3_0_pre1-4 : HARVESTING:validationHarvesting" 1179 print " 3_3_0_pre6 : HARVESTING:validationHarvesting" 1180 print " 3_4_0_pre1 : HARVESTING:validationHarvesting" 1182 print sep_line_short
1184 print "`RelValFS' maps to:" 1185 print " always : HARVESTING:validationHarvestingFS" 1187 print sep_line_short
1189 print "`MC' maps to:" 1190 print " always : HARVESTING:validationprodHarvesting" 1192 print sep_line_short
1194 print "`DQMOffline' maps to:" 1195 print " always : HARVESTING:dqmHarvesting" 1208 """Fill our dictionary with all info needed to understand 1211 This depends on the CMSSW version since at some point the 1212 names and sequences were modified. 1214 NOTE: There is no way (at least not that I could come up with) 1215 to code this in a neat generic way that can be read both by 1216 this method and by option_handler_list_types(). Please try 1217 hard to keep these two methods in sync! 1221 assert not self.cmssw_version
is None, \
1222 "ERROR setup_harvesting() requires " \
1223 "self.cmssw_version to be set!!!" 1225 harvesting_info = {}
1228 harvesting_info[
"DQMOffline"] = {}
1229 harvesting_info[
"DQMOffline"][
"beamspot"] =
None 1230 harvesting_info[
"DQMOffline"][
"eventcontent"] =
None 1231 harvesting_info[
"DQMOffline"][
"harvesting"] =
"AtRunEnd" 1233 harvesting_info[
"RelVal"] = {}
1234 harvesting_info[
"RelVal"][
"beamspot"] =
None 1235 harvesting_info[
"RelVal"][
"eventcontent"] =
None 1236 harvesting_info[
"RelVal"][
"harvesting"] =
"AtRunEnd" 1238 harvesting_info[
"RelValFS"] = {}
1239 harvesting_info[
"RelValFS"][
"beamspot"] =
None 1240 harvesting_info[
"RelValFS"][
"eventcontent"] =
None 1241 harvesting_info[
"RelValFS"][
"harvesting"] =
"AtRunEnd" 1243 harvesting_info[
"MC"] = {}
1244 harvesting_info[
"MC"][
"beamspot"] =
None 1245 harvesting_info[
"MC"][
"eventcontent"] =
None 1246 harvesting_info[
"MC"][
"harvesting"] =
"AtRunEnd" 1256 assert self.cmssw_version.startswith(
"CMSSW_")
1259 version = self.cmssw_version[6:]
1265 if version <
"3_3_0":
1266 step_string =
"validationHarvesting" 1267 elif version
in [
"3_3_0_pre1",
"3_3_0_pre2",
1268 "3_3_0_pre3",
"3_3_0_pre4",
1269 "3_3_0_pre6",
"3_4_0_pre1"]:
1270 step_string =
"validationHarvesting" 1272 step_string =
"validationHarvesting+dqmHarvesting" 1274 harvesting_info[
"RelVal"][
"step_string"] = step_string
1278 assert not step_string
is None, \
1279 "ERROR Could not decide a RelVal harvesting sequence " \
1280 "for CMSSW version %s" % self.cmssw_version
1286 step_string =
"validationHarvestingFS" 1288 harvesting_info[
"RelValFS"][
"step_string"] = step_string
1293 step_string =
"validationprodHarvesting" 1295 harvesting_info[
"MC"][
"step_string"] = step_string
1299 assert not step_string
is None, \
1300 "ERROR Could not decide a MC harvesting " \
1301 "sequence for CMSSW version %s" % self.cmssw_version
1307 step_string =
"dqmHarvesting" 1309 harvesting_info[
"DQMOffline"][
"step_string"] = step_string
1313 self.harvesting_info = harvesting_info
1315 self.logger.info(
"Based on the CMSSW version (%s) " \
1316 "I decided to use the `HARVESTING:%s' " \
1317 "sequence for %s harvesting" % \
1318 (self.cmssw_version,
1319 self.harvesting_info[self.harvesting_type][
"step_string"],
1320 self.harvesting_type))
1327 """Build the common part of the output path to be used on 1330 This consists of the CASTOR area base path specified by the 1331 user and a piece depending on the data type (data vs. MC), the 1332 harvesting type and the dataset name followed by a piece 1333 containing the run number and event count. (See comments in 1334 create_castor_path_name_special for details.) This method 1335 creates the common part, without run number and event count. 1339 castor_path = self.castor_base_dir
1344 datatype = self.datasets_information[dataset_name][
"datatype"]
1345 datatype = datatype.lower()
1346 castor_path = os.path.join(castor_path, datatype)
1349 harvesting_type = self.harvesting_type
1350 harvesting_type = harvesting_type.lower()
1351 castor_path = os.path.join(castor_path, harvesting_type)
1361 release_version = self.cmssw_version
1362 release_version = release_version.lower(). \
1365 castor_path = os.path.join(castor_path, release_version)
1368 dataset_name_escaped = self.escape_dataset_name(dataset_name)
1369 castor_path = os.path.join(castor_path, dataset_name_escaped)
1373 castor_path = os.path.normpath(castor_path)
1381 dataset_name, run_number,
1382 castor_path_common):
1383 """Create the specialised part of the CASTOR output dir name. 1385 NOTE: To avoid clashes with `incremental harvesting' 1386 (re-harvesting when a dataset grows) we have to include the 1387 event count in the path name. The underlying `problem' is that 1388 CRAB does not overwrite existing output files so if the output 1389 file already exists CRAB will fail to copy back the output. 1391 NOTE: It's not possible to create different kinds of 1392 harvesting jobs in a single call to this tool. However, in 1393 principle it could be possible to create both data and MC jobs 1396 NOTE: The number of events used in the path name is the 1397 _total_ number of events in the dataset/run at the time of 1398 harvesting. If we're doing partial harvesting the final 1399 results will reflect lower statistics. This is a) the easiest 1400 to code and b) the least likely to lead to confusion if 1401 someone ever decides to swap/copy around file blocks between 1406 castor_path = castor_path_common
1411 castor_path = os.path.join(castor_path,
"run_%d" % run_number)
1419 castor_path = os.path.join(castor_path,
"nevents")
1423 castor_path = os.path.normpath(castor_path)
1431 """Make sure all required CASTOR output dirs exist. 1433 This checks the CASTOR base dir specified by the user as well 1434 as all the subdirs required by the current set of jobs. 1438 self.logger.info(
"Checking (and if necessary creating) CASTOR " \
1439 "output area(s)...")
1442 self.create_and_check_castor_dir(self.castor_base_dir)
1446 for (dataset_name, runs)
in six.iteritems(self.datasets_to_use):
1449 castor_dirs.append(self.datasets_information[dataset_name] \
1450 [
"castor_path"][run])
1451 castor_dirs_unique = sorted(set(castor_dirs))
1455 ndirs = len(castor_dirs_unique)
1456 step =
max(ndirs / 10, 1)
1457 for (i, castor_dir)
in enumerate(castor_dirs_unique):
1458 if (i + 1) % step == 0
or \
1460 self.logger.info(
" %d/%d" % \
1462 self.create_and_check_castor_dir(castor_dir)
1469 self.logger.debug(
"Checking if path `%s' is empty" % \
1471 cmd =
"rfdir %s" % castor_dir
1472 (status, output) = commands.getstatusoutput(cmd)
1474 msg =
"Could not access directory `%s'" \
1475 " !!! This is bad since I should have just" \
1476 " created it !!!" % castor_dir
1477 self.logger.fatal(msg)
1480 self.logger.warning(
"Output directory `%s' is not empty:" \
1481 " new jobs will fail to" \
1482 " copy back output" % \
1490 """Check existence of the give CASTOR dir, if necessary create 1493 Some special care has to be taken with several things like 1494 setting the correct permissions such that CRAB can store the 1495 output results. Of course this means that things like 1496 /castor/cern.ch/ and user/j/ have to be recognised and treated 1499 NOTE: Only CERN CASTOR area (/castor/cern.ch/) supported for 1502 NOTE: This method uses some slightly tricky caching to make 1503 sure we don't keep over and over checking the same base paths. 1510 def split_completely(path):
1511 (parent_path, name) = os.path.split(path)
1513 return (parent_path, )
1515 return split_completely(parent_path) + (name, )
1521 def extract_permissions(rfstat_output):
1522 """Parse the output from rfstat and return the 1523 5-digit permissions string.""" 1525 permissions_line = [i
for i
in output.split(
"\n") \
1526 if i.lower().
find(
"protection") > -1]
1527 regexp = re.compile(
".*\(([0123456789]{5})\).*")
1528 match = regexp.search(rfstat_output)
1529 if not match
or len(match.groups()) != 1:
1530 msg =
"Could not extract permissions " \
1531 "from output: %s" % rfstat_output
1532 self.logger.fatal(msg)
1534 permissions = match.group(1)
1551 castor_paths_dont_touch = {
1552 0: [
"/",
"castor",
"cern.ch",
"cms",
"store",
"temp",
1553 "dqm",
"offline",
"user"],
1554 -1: [
"user",
"store"]
1557 self.logger.debug(
"Checking CASTOR path `%s'" % castor_dir)
1562 castor_path_pieces = split_completely(castor_dir)
1568 check_sizes = sorted(castor_paths_dont_touch.keys())
1569 len_castor_path_pieces = len(castor_path_pieces)
1570 for piece_index
in xrange (len_castor_path_pieces):
1571 skip_this_path_piece =
False 1572 piece = castor_path_pieces[piece_index]
1575 for check_size
in check_sizes:
1577 if (piece_index + check_size) > -1:
1581 if castor_path_pieces[piece_index + check_size]
in castor_paths_dont_touch[check_size]:
1583 skip_this_path_piece =
True 1591 path = os.path.join(path, piece)
1598 if path
in self.castor_path_checks_cache:
1600 except AttributeError:
1602 self.castor_path_checks_cache = []
1603 self.castor_path_checks_cache.append(path)
1621 if not skip_this_path_piece:
1629 self.logger.debug(
"Checking if path `%s' exists" % \
1631 cmd =
"rfstat %s" % path
1632 (status, output) = commands.getstatusoutput(cmd)
1635 self.logger.debug(
"Creating path `%s'" % path)
1636 cmd =
"nsmkdir -m 775 %s" % path
1637 (status, output) = commands.getstatusoutput(cmd)
1639 msg =
"Could not create directory `%s'" % path
1640 self.logger.fatal(msg)
1642 cmd =
"rfstat %s" % path
1643 (status, output) = commands.getstatusoutput(cmd)
1648 permissions = extract_permissions(output)
1649 if not permissions.startswith(
"40"):
1650 msg =
"Path `%s' is not a directory(?)" % path
1651 self.logger.fatal(msg)
1656 self.logger.debug(
"Checking permissions for path `%s'" % path)
1657 cmd =
"rfstat %s" % path
1658 (status, output) = commands.getstatusoutput(cmd)
1660 msg =
"Could not obtain permissions for directory `%s'" % \
1662 self.logger.fatal(msg)
1665 permissions = extract_permissions(output)[-3:]
1669 if piece_index == (len_castor_path_pieces - 1):
1672 permissions_target =
"775" 1675 permissions_target =
"775" 1678 permissions_new = []
1679 for (i, j)
in zip(permissions, permissions_target):
1681 permissions_new =
"".
join(permissions_new)
1682 self.logger.debug(
" current permissions: %s" % \
1684 self.logger.debug(
" target permissions : %s" % \
1686 if permissions_new != permissions:
1688 self.logger.debug(
"Changing permissions of `%s' " \
1689 "to %s (were %s)" % \
1690 (path, permissions_new, permissions))
1691 cmd =
"rfchmod %s %s" % (permissions_new, path)
1692 (status, output) = commands.getstatusoutput(cmd)
1694 msg =
"Could not change permissions for path `%s' " \
1695 "to %s" % (path, permissions_new)
1696 self.logger.fatal(msg)
1699 self.logger.debug(
" Permissions ok (%s)" % permissions_new)
1708 sites_forbidden = []
1710 if (self.preferred_site ==
"CAF")
or (self.preferred_site ==
"caf.cern.ch"):
1711 self.caf_access =
True 1713 if self.caf_access ==
False:
1714 sites_forbidden.append(
"caf.cern.ch")
1725 "cmssrm-fzk.gridka.de",
1727 "gridka-dCache.fzk.de",
1728 "srm-cms.gridpp.rl.ac.uk",
1729 "srm.grid.sinica.edu.tw",
1730 "srm2.grid.sinica.edu.tw",
1732 "storm-fe-cms.cr.cnaf.infn.it" 1736 "CAF" :
"caf.cern.ch",
1737 "CH" :
"srm-cms.cern.ch",
1738 "FR" :
"ccsrm.in2p3.fr",
1739 "DE" :
"cmssrm-fzk.gridka.de",
1740 "GOV" :
"cmssrm.fnal.gov",
1741 "DE2" :
"gridka-dCache.fzk.de",
1742 "UK" :
"srm-cms.gridpp.rl.ac.uk",
1743 "TW" :
"srm.grid.sinica.edu.tw",
1744 "TW2" :
"srm2.grid.sinica.edu.tw",
1745 "ES" :
"srmcms.pic.es",
1746 "IT" :
"storm-fe-cms.cr.cnaf.infn.it" 1749 if self.non_t1access:
1750 sites_forbidden.extend(all_t1)
1752 for site
in sites_forbidden:
1756 if self.preferred_site
in country_codes:
1757 self.preferred_site = country_codes[self.preferred_site]
1759 if self.preferred_site !=
"no preference":
1760 if self.preferred_site
in sites:
1761 sites = [self.preferred_site]
1774 while len(sites) > 0
and \
1781 t1_sites.append(site)
1782 if site ==
"caf.cern.ch":
1783 t1_sites.append(site)
1790 if len(t1_sites) > 0:
1791 se_name = choice(t1_sites)
1794 se_name = choice(sites)
1798 if se_name
in self.sites_and_versions_cache
and \
1799 cmssw_version
in self.sites_and_versions_cache[se_name]:
1800 if self.sites_and_versions_cache[se_name][cmssw_version]:
1804 self.logger.debug(
" --> rejecting site `%s'" % se_name)
1805 sites.remove(se_name)
1808 self.logger.info(
"Checking if site `%s' " \
1809 "has CMSSW version `%s'" % \
1810 (se_name, cmssw_version))
1811 self.sites_and_versions_cache[se_name] = {}
1826 cmd =
"lcg-info --list-ce " \
1829 "CEStatus=Production," \
1831 (cmssw_version, se_name)
1832 (status, output) = commands.getstatusoutput(cmd)
1834 self.logger.error(
"Could not check site information " \
1835 "for site `%s'" % se_name)
1837 if (len(output) > 0)
or (se_name ==
"caf.cern.ch"):
1838 self.sites_and_versions_cache[se_name][cmssw_version] =
True 1842 self.sites_and_versions_cache[se_name][cmssw_version] =
False 1843 self.logger.debug(
" --> rejecting site `%s'" % se_name)
1844 sites.remove(se_name)
1846 if site_name
is self.no_matching_site_found_str:
1847 self.logger.error(
" --> no matching site found")
1848 self.logger.error(
" --> Your release or SCRAM " \
1849 "architecture may not be available" \
1850 "anywhere on the (LCG) grid.")
1852 self.logger.debug(
" (command used: `%s')" % cmd)
1854 self.logger.debug(
" --> selected site `%s'" % site_name)
1858 if site_name
is None:
1859 site_name = self.no_matching_site_found_str
1862 self.all_sites_found =
False 1874 parser = optparse.OptionParser(version=
"%s %s" % \
1875 (
"%prog", self.version),
1878 self.option_parser = parser
1881 parser.add_option(
"-d",
"--debug",
1882 help=
"Switch on debug mode",
1884 callback=self.option_handler_debug)
1887 parser.add_option(
"-q",
"--quiet",
1888 help=
"Be less verbose",
1890 callback=self.option_handler_quiet)
1894 parser.add_option(
"",
"--force",
1895 help=
"Force mode. Do not abort on sanity check " 1898 callback=self.option_handler_force)
1901 parser.add_option(
"",
"--harvesting_type",
1902 help=
"Harvesting type: %s" % \
1903 ", ".
join(self.harvesting_types),
1905 callback=self.option_handler_harvesting_type,
1907 metavar=
"HARVESTING_TYPE")
1910 parser.add_option(
"",
"--harvesting_mode",
1911 help=
"Harvesting mode: %s (default = %s)" % \
1912 (
", ".
join(self.harvesting_modes),
1913 self.harvesting_mode_default),
1915 callback=self.option_handler_harvesting_mode,
1917 metavar=
"HARVESTING_MODE")
1920 parser.add_option(
"",
"--globaltag",
1921 help=
"GlobalTag to use. Default is the ones " \
1922 "the dataset was created with for MC, for data" \
1923 "a GlobalTag has to be specified.",
1925 callback=self.option_handler_globaltag,
1927 metavar=
"GLOBALTAG")
1930 parser.add_option(
"",
"--no-ref-hists",
1931 help=
"Don't use any reference histograms",
1933 callback=self.option_handler_no_ref_hists)
1937 parser.add_option(
"",
"--frontier-connection",
1938 help=
"Use this Frontier connection to find " \
1939 "GlobalTags and LocalTags (for reference " \
1940 "histograms).\nPlease only use this for " \
1943 callback=self.option_handler_frontier_connection,
1949 parser.add_option(
"",
"--frontier-connection-for-globaltag",
1950 help=
"Use this Frontier connection to find " \
1951 "GlobalTags.\nPlease only use this for " \
1954 callback=self.option_handler_frontier_connection,
1960 parser.add_option(
"",
"--frontier-connection-for-refhists",
1961 help=
"Use this Frontier connection to find " \
1962 "LocalTags (for reference " \
1963 "histograms).\nPlease only use this for " \
1966 callback=self.option_handler_frontier_connection,
1972 parser.add_option(
"",
"--dataset",
1973 help=
"Name (or regexp) of dataset(s) to process",
1976 callback=self.option_handler_input_spec,
1983 parser.add_option(
"",
"--dataset-ignore",
1984 help=
"Name (or regexp) of dataset(s) to ignore",
1986 callback=self.option_handler_input_spec,
1988 metavar=
"DATASET-IGNORE")
1992 parser.add_option(
"",
"--runs",
1993 help=
"Run number(s) to process",
1995 callback=self.option_handler_input_spec,
2001 parser.add_option(
"",
"--runs-ignore",
2002 help=
"Run number(s) to ignore",
2004 callback=self.option_handler_input_spec,
2006 metavar=
"RUNS-IGNORE")
2010 parser.add_option(
"",
"--datasetfile",
2011 help=
"File containing list of dataset names " \
2012 "(or regexps) to process",
2015 callback=self.option_handler_input_spec,
2018 metavar=
"DATASETFILE")
2022 parser.add_option(
"",
"--datasetfile-ignore",
2023 help=
"File containing list of dataset names " \
2024 "(or regexps) to ignore",
2026 callback=self.option_handler_input_spec,
2028 metavar=
"DATASETFILE-IGNORE")
2032 parser.add_option(
"",
"--runslistfile",
2033 help=
"File containing list of run numbers " \
2036 callback=self.option_handler_input_spec,
2038 metavar=
"RUNSLISTFILE")
2042 parser.add_option(
"",
"--runslistfile-ignore",
2043 help=
"File containing list of run numbers " \
2046 callback=self.option_handler_input_spec,
2048 metavar=
"RUNSLISTFILE-IGNORE")
2052 parser.add_option(
"",
"--Jsonrunfile",
2053 help=
"Jsonfile containing dictionary of run/lumisections pairs. " \
2054 "All lumisections of runs contained in dictionary are processed.",
2056 callback=self.option_handler_input_Jsonrunfile,
2058 metavar=
"JSONRUNFILE")
2062 parser.add_option(
"",
"--Jsonfile",
2063 help=
"Jsonfile containing dictionary of run/lumisections pairs. " \
2064 "Only specified lumisections of runs contained in dictionary are processed.",
2066 callback=self.option_handler_input_Jsonfile,
2072 parser.add_option(
"",
"--todo-file",
2073 help=
"Todo file containing a list of runs to process.",
2075 callback=self.option_handler_input_todofile,
2077 metavar=
"TODO-FILE")
2081 parser.add_option(
"",
"--refhistmappingfile",
2082 help=
"File to be use for the reference " \
2083 "histogram mappings. Default: `%s'." % \
2084 self.ref_hist_mappings_file_name_default,
2086 callback=self.option_handler_ref_hist_mapping_file,
2088 metavar=
"REFHISTMAPPING-FILE")
2093 parser.add_option(
"",
"--castordir",
2094 help=
"Place on CASTOR to store results. " \
2095 "Default: `%s'." % \
2096 self.castor_base_dir_default,
2098 callback=self.option_handler_castor_dir,
2100 metavar=
"CASTORDIR")
2104 parser.add_option(
"",
"--no-t1access",
2105 help=
"Try to create jobs that will run " \
2106 "without special `t1access' role",
2108 callback=self.option_handler_no_t1access)
2111 parser.add_option(
"",
"--caf-access",
2112 help=
"Crab jobs may run " \
2115 callback=self.option_handler_caf_access)
2118 parser.add_option(
"",
"--saveByLumiSection",
2119 help=
"set saveByLumiSection=1 in harvesting cfg file",
2121 callback=self.option_handler_saveByLumiSection)
2124 parser.add_option(
"",
"--automatic-crab-submission",
2125 help=
"Crab jobs are created and " \
2126 "submitted automatically",
2128 callback=self.option_handler_crab_submission)
2132 parser.add_option(
"",
"--max-sites",
2133 help=
"Max. number of sites each job is submitted to",
2135 callback=self.option_handler_sites,
2139 parser.add_option(
"",
"--site",
2140 help=
"Crab jobs are submitted to specified site. T1 sites may be shortened by the following (country) codes: \ 2141 srm-cms.cern.ch : CH \ 2142 ccsrm.in2p3.fr : FR \ 2143 cmssrm-fzk.gridka.de : DE \ 2144 cmssrm.fnal.gov : GOV \ 2145 gridka-dCache.fzk.de : DE2 \ 2146 rm-cms.gridpp.rl.ac.uk : UK \ 2147 srm.grid.sinica.edu.tw : TW \ 2148 srm2.grid.sinica.edu.tw : TW2 \ 2149 srmcms.pic.es : ES \ 2150 storm-fe-cms.cr.cnaf.infn.it : IT",
2152 callback=self.option_handler_preferred_site,
2157 parser.add_option(
"-l",
"--list",
2158 help=
"List all harvesting types and their" \
2159 "corresponding sequence names",
2161 callback=self.option_handler_list_types)
2167 if len(self.cmd_line_opts) < 1:
2168 self.cmd_line_opts = [
"--help"]
2177 for i
in [
"-d",
"--debug",
2179 if i
in self.cmd_line_opts:
2180 self.cmd_line_opts.remove(i)
2181 self.cmd_line_opts.insert(0, i)
2184 parser.set_defaults()
2185 (self.options, self.args) = parser.parse_args(self.cmd_line_opts)
2192 """Check completeness and correctness of input information. 2194 Check that all required information has been specified and 2195 that, at least as far as can be easily checked, it makes 2198 NOTE: This is also where any default values are applied. 2202 self.logger.info(
"Checking completeness/correctness of input...")
2206 if len(self.args) > 0:
2207 msg =
"Sorry but I don't understand `%s'" % \
2208 (
" ".
join(self.args))
2209 self.logger.fatal(msg)
2215 if self.harvesting_mode ==
"two-step":
2216 msg =
"--------------------\n" \
2217 " Sorry, but for the moment (well, till it works)" \
2218 " the two-step mode has been disabled.\n" \
2219 "--------------------\n" 2220 self.logger.fatal(msg)
2225 if self.harvesting_type
is None:
2226 msg =
"Please specify a harvesting type" 2227 self.logger.fatal(msg)
2230 if self.harvesting_mode
is None:
2231 self.harvesting_mode = self.harvesting_mode_default
2232 msg =
"No harvesting mode specified --> using default `%s'" % \
2233 self.harvesting_mode
2234 self.logger.warning(msg)
2240 if self.input_method[
"datasets"][
"use"]
is None:
2241 msg =
"Please specify an input dataset name " \
2242 "or a list file name" 2243 self.logger.fatal(msg)
2248 assert not self.input_name[
"datasets"][
"use"]
is None 2255 if self.use_ref_hists:
2256 if self.ref_hist_mappings_file_name
is None:
2257 self.ref_hist_mappings_file_name = self.ref_hist_mappings_file_name_default
2258 msg =
"No reference histogram mapping file specified --> " \
2259 "using default `%s'" % \
2260 self.ref_hist_mappings_file_name
2261 self.logger.warning(msg)
2267 if self.castor_base_dir
is None:
2268 self.castor_base_dir = self.castor_base_dir_default
2269 msg =
"No CASTOR area specified -> using default `%s'" % \
2270 self.castor_base_dir
2271 self.logger.warning(msg)
2275 if not self.castor_base_dir.startswith(self.castor_prefix):
2276 msg =
"CASTOR area does not start with `%s'" % \
2278 self.logger.fatal(msg)
2279 if self.castor_base_dir.find(
"castor") > -1
and \
2280 not self.castor_base_dir.find(
"cern.ch") > -1:
2281 self.logger.fatal(
"Only CERN CASTOR is supported")
2292 if self.globaltag
is None:
2293 self.logger.warning(
"No GlobalTag specified. This means I cannot")
2294 self.logger.warning(
"run on data, only on MC.")
2295 self.logger.warning(
"I will skip all data datasets.")
2300 if not self.globaltag
is None:
2301 if not self.globaltag.endswith(
"::All"):
2302 self.logger.warning(
"Specified GlobalTag `%s' does " \
2303 "not end in `::All' --> " \
2304 "appending this missing piece" % \
2306 self.globaltag =
"%s::All" % self.globaltag
2311 for (key, value)
in six.iteritems(self.frontier_connection_name):
2312 frontier_type_str =
"unknown" 2313 if key ==
"globaltag":
2314 frontier_type_str =
"the GlobalTag" 2315 elif key ==
"refhists":
2316 frontier_type_str =
"the reference histograms" 2318 if self.frontier_connection_overridden[key] ==
True:
2322 self.logger.info(
"Using %sdefault Frontier " \
2323 "connection for %s: `%s'" % \
2324 (non_str, frontier_type_str, value))
2333 """Check if CMSSW is setup. 2340 cmssw_version = os.getenv(
"CMSSW_VERSION")
2341 if cmssw_version
is None:
2342 self.logger.fatal(
"It seems CMSSW is not setup...")
2343 self.logger.fatal(
"($CMSSW_VERSION is empty)")
2344 raise Error(
"ERROR: CMSSW needs to be setup first!")
2346 self.cmssw_version = cmssw_version
2347 self.logger.info(
"Found CMSSW version %s properly set up" % \
2356 """Check if DBS is setup. 2363 dbs_home = os.getenv(
"DBSCMD_HOME")
2364 if dbs_home
is None:
2365 self.logger.fatal(
"It seems DBS is not setup...")
2366 self.logger.fatal(
" $DBSCMD_HOME is empty")
2367 raise Error(
"ERROR: DBS needs to be setup first!")
2385 self.logger.debug(
"Found DBS properly set up")
2393 """Setup the Python side of DBS. 2395 For more information see the DBS Python API documentation: 2396 https://twiki.cern.ch/twiki/bin/view/CMS/DBSApiDocumentation 2402 args[
"url"]=
"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/" \
2403 "servlet/DBSServlet" 2407 except DBSAPI.dbsApiException.DbsApiException
as ex:
2408 self.logger.fatal(
"Caught DBS API exception %s: %s " % \
2409 (ex.getClassName(), ex.getErrorMessage()))
2410 if ex.getErrorCode()
not in (
None,
""):
2411 logger.debug(
"DBS exception error code: ", ex.getErrorCode())
2419 """Use DBS to resolve a wildcarded dataset name. 2425 assert not self.dbs_api
is None 2430 if not (dataset_name.startswith(
"/")
and \
2431 dataset_name.endswith(
"RECO")):
2432 self.logger.warning(
"Dataset name `%s' does not sound " \
2433 "like a valid dataset name!" % \
2439 dbs_query =
"find dataset where dataset like %s " \
2440 "and dataset.status = VALID" % \
2443 api_result = api.executeQuery(dbs_query)
2444 except DBSAPI.dbsApiException.DbsApiException:
2445 msg =
"ERROR: Could not execute DBS query" 2446 self.logger.fatal(msg)
2451 parser = xml.sax.make_parser()
2452 parser.setContentHandler(handler)
2456 xml.sax.parseString(api_result, handler)
2457 except SAXParseException:
2458 msg =
"ERROR: Could not parse DBS server output" 2459 self.logger.fatal(msg)
2463 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2467 datasets = handler.results.values()[0]
2475 """Ask DBS for the CMSSW version used to create this dataset. 2481 assert not self.dbs_api
is None 2485 dbs_query =
"find algo.version where dataset = %s " \
2486 "and dataset.status = VALID" % \
2489 api_result = api.executeQuery(dbs_query)
2490 except DBSAPI.dbsApiException.DbsApiException:
2491 msg =
"ERROR: Could not execute DBS query" 2492 self.logger.fatal(msg)
2496 parser = xml.sax.make_parser()
2497 parser.setContentHandler(handler)
2500 xml.sax.parseString(api_result, handler)
2501 except SAXParseException:
2502 msg =
"ERROR: Could not parse DBS server output" 2503 self.logger.fatal(msg)
2507 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2510 cmssw_version = handler.results.values()[0]
2513 assert len(cmssw_version) == 1
2516 cmssw_version = cmssw_version[0]
2519 return cmssw_version
2569 """Ask DBS for the list of runs in a given dataset. 2571 # NOTE: This does not (yet?) skip/remove empty runs. There is 2572 # a bug in the DBS entry run.numevents (i.e. it always returns 2573 # zero) which should be fixed in the `next DBS release'. 2575 # https://savannah.cern.ch/bugs/?53452 2576 # https://savannah.cern.ch/bugs/?53711 2587 assert not self.dbs_api
is None 2591 dbs_query =
"find run where dataset = %s " \
2592 "and dataset.status = VALID" % \
2595 api_result = api.executeQuery(dbs_query)
2596 except DBSAPI.dbsApiException.DbsApiException:
2597 msg =
"ERROR: Could not execute DBS query" 2598 self.logger.fatal(msg)
2602 parser = xml.sax.make_parser()
2603 parser.setContentHandler(handler)
2606 xml.sax.parseString(api_result, handler)
2607 except SAXParseException:
2608 msg =
"ERROR: Could not parse DBS server output" 2609 self.logger.fatal(msg)
2613 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2616 runs = handler.results.values()[0]
2618 runs = sorted([
int(i)
for i
in runs])
2626 """Ask DBS for the globaltag corresponding to a given dataset. 2629 # This does not seem to work for data datasets? E.g. for 2630 # /Cosmics/Commissioning08_CRAFT0831X_V1_311_ReReco_FromSuperPointing_v1/RAW-RECO 2631 # Probaly due to the fact that the GlobalTag changed during 2639 assert not self.dbs_api
is None 2643 dbs_query =
"find dataset.tag where dataset = %s " \
2644 "and dataset.status = VALID" % \
2647 api_result = api.executeQuery(dbs_query)
2648 except DBSAPI.dbsApiException.DbsApiException:
2649 msg =
"ERROR: Could not execute DBS query" 2650 self.logger.fatal(msg)
2654 parser = xml.sax.make_parser()
2655 parser.setContentHandler(parser)
2658 xml.sax.parseString(api_result, handler)
2659 except SAXParseException:
2660 msg =
"ERROR: Could not parse DBS server output" 2661 self.logger.fatal(msg)
2665 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2668 globaltag = handler.results.values()[0]
2671 assert len(globaltag) == 1
2674 globaltag = globaltag[0]
2682 """Ask DBS for the the data type (data or mc) of a given 2689 assert not self.dbs_api
is None 2693 dbs_query =
"find datatype.type where dataset = %s " \
2694 "and dataset.status = VALID" % \
2697 api_result = api.executeQuery(dbs_query)
2698 except DBSAPI.dbsApiException.DbsApiException:
2699 msg =
"ERROR: Could not execute DBS query" 2700 self.logger.fatal(msg)
2704 parser = xml.sax.make_parser()
2705 parser.setContentHandler(handler)
2708 xml.sax.parseString(api_result, handler)
2709 except SAXParseException:
2710 msg =
"ERROR: Could not parse DBS server output" 2711 self.logger.fatal(msg)
2715 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2718 datatype = handler.results.values()[0]
2721 assert len(datatype) == 1
2724 datatype = datatype[0]
2735 """Determine the number of events in a given dataset (and run). 2737 Ask DBS for the number of events in a dataset. If a run number 2738 is specified the number of events returned is that in that run 2739 of that dataset. If problems occur we throw an exception. 2742 # Since DBS does not return the number of events correctly, 2743 # neither for runs nor for whole datasets, we have to work 2744 # around that a bit... 2751 assert not self.dbs_api
is None 2755 dbs_query =
"find file.name, file.numevents where dataset = %s " \
2756 "and dataset.status = VALID" % \
2758 if not run_number
is None:
2759 dbs_query = dbq_query + (
" and run = %d" % run_number)
2761 api_result = api.executeQuery(dbs_query)
2762 except DBSAPI.dbsApiException.DbsApiException:
2763 msg =
"ERROR: Could not execute DBS query" 2764 self.logger.fatal(msg)
2768 parser = xml.sax.make_parser()
2769 parser.setContentHandler(handler)
2772 xml.sax.parseString(api_result, handler)
2773 except SAXParseException:
2774 msg =
"ERROR: Could not parse DBS server output" 2775 self.logger.fatal(msg)
2779 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2782 num_events = sum(handler.results[
"file.numevents"])
3076 """Figure out the number of events in each run of this dataset. 3078 This is a more efficient way of doing this than calling 3079 dbs_resolve_number_of_events for each run. 3083 self.logger.debug(
"Checking spread of dataset `%s'" % dataset_name)
3087 assert not self.dbs_api
is None 3091 dbs_query =
"find run.number, site, file.name, file.numevents " \
3092 "where dataset = %s " \
3093 "and dataset.status = VALID" % \
3096 api_result = api.executeQuery(dbs_query)
3097 except DBSAPI.dbsApiException.DbsApiException:
3098 msg =
"ERROR: Could not execute DBS query" 3099 self.logger.fatal(msg)
3102 handler =
DBSXMLHandler([
"run.number",
"site",
"file.name",
"file.numevents"])
3103 parser = xml.sax.make_parser()
3104 parser.setContentHandler(handler)
3147 xml.sax.parseString(api_result, handler)
3148 except SAXParseException:
3149 msg =
"ERROR: Could not parse DBS server output" 3150 self.logger.fatal(msg)
3154 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 3161 for (index, site_name)
in enumerate(handler.results[
"site"]):
3171 if len(site_name) < 1:
3173 run_number =
int(handler.results[
"run.number"][index])
3174 file_name = handler.results[
"file.name"][index]
3175 nevents =
int(handler.results[
"file.numevents"][index])
3178 if run_number
not in files_info:
3180 files_info[run_number] = {}
3181 files_info[run_number][file_name] = (nevents,
3183 elif file_name
not in files_info[run_number]:
3185 files_info[run_number][file_name] = (nevents,
3192 assert nevents == files_info[run_number][file_name][0]
3194 files_info[run_number][file_name][1].
append(site_name)
3199 for run_number
in files_info.keys():
3200 files_without_sites = [i
for (i, j)
in \
3201 files_info[run_number].
items() \
3203 if len(files_without_sites) > 0:
3204 self.logger.warning(
"Removing %d file(s)" \
3205 " with empty site names" % \
3206 len(files_without_sites))
3207 for file_name
in files_without_sites:
3208 del files_info[run_number][file_name]
3214 num_events_catalog = {}
3215 for run_number
in files_info.keys():
3216 site_names =
list(set([j
for i
in files_info[run_number].
values()
for j
in i[1]]))
3222 if len(site_names) > 1:
3230 all_file_names = files_info[run_number].
keys()
3231 all_file_names = set(all_file_names)
3232 sites_with_complete_copies = []
3233 for site_name
in site_names:
3234 files_at_site = [i
for (i, (j, k)) \
3235 in files_info[run_number].
items() \
3237 files_at_site = set(files_at_site)
3238 if files_at_site == all_file_names:
3239 sites_with_complete_copies.append(site_name)
3240 if len(sites_with_complete_copies) < 1:
3246 if len(sites_with_complete_copies) > 1:
3265 self.logger.debug(
" -> run appears to be `mirrored'")
3267 self.logger.debug(
" -> run appears to be spread-out")
3270 len(sites_with_complete_copies) != len(site_names):
3274 for (file_name, (i, sites))
in files_info[run_number].
items():
3275 complete_sites = [site
for site
in sites \
3276 if site
in sites_with_complete_copies]
3277 files_info[run_number][file_name] = (i, complete_sites)
3278 site_names = sites_with_complete_copies
3280 self.logger.debug(
" for run #%d:" % run_number)
3281 num_events_catalog[run_number] = {}
3282 num_events_catalog[run_number][
"all_sites"] = sum([i[0]
for i
in files_info[run_number].
values()])
3283 if len(site_names) < 1:
3284 self.logger.debug(
" run is not available at any site")
3285 self.logger.debug(
" (but should contain %d events" % \
3286 num_events_catalog[run_number][
"all_sites"])
3288 self.logger.debug(
" at all sites combined there are %d events" % \
3289 num_events_catalog[run_number][
"all_sites"])
3290 for site_name
in site_names:
3291 num_events_catalog[run_number][site_name] = sum([i[0]
for i
in files_info[run_number].
values()
if site_name
in i[1]])
3292 self.logger.debug(
" at site `%s' there are %d events" % \
3293 (site_name, num_events_catalog[run_number][site_name]))
3294 num_events_catalog[run_number][
"mirrored"] = mirrored
3297 return num_events_catalog
3357 """Build a list of all datasets to be processed. 3366 if input_method
is None:
3368 elif input_method ==
"dataset":
3373 self.logger.info(
"Asking DBS for dataset names")
3374 dataset_names = self.dbs_resolve_dataset_name(input_name)
3375 elif input_method ==
"datasetfile":
3380 self.logger.info(
"Reading input from list file `%s'" % \
3383 listfile = open(
"/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/bin/%s" %input_name,
"r") 3384 print "open listfile" 3385 for dataset
in listfile:
3387 dataset_stripped = dataset.strip()
3388 if len(dataset_stripped) < 1:
3391 if dataset_stripped[0] !=
"#":
3392 dataset_names.extend(self. \
3396 msg =
"ERROR: Could not open input list file `%s'" % \
3398 self.logger.fatal(msg)
3403 assert False,
"Unknown input method `%s'" % input_method
3411 dataset_names = sorted(set(dataset_names))
3415 return dataset_names
3420 """Build a list of datasets to process. 3424 self.logger.info(
"Building list of datasets to consider...")
3426 input_method = self.input_method[
"datasets"][
"use"]
3427 input_name = self.input_name[
"datasets"][
"use"]
3428 dataset_names = self.build_dataset_list(input_method,
3431 [
None] * len(dataset_names))))
3433 self.logger.info(
" found %d dataset(s) to process:" % \
3435 for dataset
in dataset_names:
3436 self.logger.info(
" `%s'" % dataset)
3443 """Build a list of datasets to ignore. 3445 NOTE: We should always have a list of datasets to process, but 3446 it may be that we don't have a list of datasets to ignore. 3450 self.logger.info(
"Building list of datasets to ignore...")
3452 input_method = self.input_method[
"datasets"][
"ignore"]
3453 input_name = self.input_name[
"datasets"][
"ignore"]
3454 dataset_names = self.build_dataset_list(input_method,
3457 [
None] * len(dataset_names))))
3459 self.logger.info(
" found %d dataset(s) to ignore:" % \
3461 for dataset
in dataset_names:
3462 self.logger.info(
" `%s'" % dataset)
3474 if input_method
is None:
3476 elif input_method ==
"runs":
3479 self.logger.info(
"Reading list of runs from the " \
3481 runs.extend([
int(i.strip()) \
3482 for i
in input_name.split(
",") \
3483 if len(i.strip()) > 0])
3484 elif input_method ==
"runslistfile":
3486 self.logger.info(
"Reading list of runs from file `%s'" % \
3489 listfile = open(input_name,
"r") 3490 for run
in listfile:
3492 run_stripped = run.strip()
3493 if len(run_stripped) < 1:
3496 if run_stripped[0] !=
"#":
3497 runs.append(
int(run_stripped))
3500 msg =
"ERROR: Could not open input list file `%s'" % \
3502 self.logger.fatal(msg)
3508 assert False,
"Unknown input method `%s'" % input_method
3512 runs =
list(set(runs))
3520 """Build a list of runs to process. 3524 self.logger.info(
"Building list of runs to consider...")
3526 input_method = self.input_method[
"runs"][
"use"]
3527 input_name = self.input_name[
"runs"][
"use"]
3528 runs = self.build_runs_list(input_method, input_name)
3531 self.logger.info(
" found %d run(s) to process:" % \
3534 self.logger.info(
" %s" %
", ".
join([
str(i)
for i
in runs]))
3541 """Build a list of runs to ignore. 3543 NOTE: We should always have a list of runs to process, but 3544 it may be that we don't have a list of runs to ignore. 3548 self.logger.info(
"Building list of runs to ignore...")
3550 input_method = self.input_method[
"runs"][
"ignore"]
3551 input_name = self.input_name[
"runs"][
"ignore"]
3552 runs = self.build_runs_list(input_method, input_name)
3555 self.logger.info(
" found %d run(s) to ignore:" % \
3558 self.logger.info(
" %s" %
", ".
join([
str(i)
for i
in runs]))
3565 """Update the list of datasets taking into account the ones to 3568 Both lists have been generated before from DBS and both are 3569 assumed to be unique. 3571 NOTE: The advantage of creating the ignore list from DBS (in 3572 case a regexp is given) and matching that instead of directly 3573 matching the ignore criterion against the list of datasets (to 3574 consider) built from DBS is that in the former case we're sure 3575 that all regexps are treated exactly as DBS would have done 3576 without the cmsHarvester. 3578 NOTE: This only removes complete samples. Exclusion of single 3579 runs is done by the book keeping. So the assumption is that a 3580 user never wants to harvest just part (i.e. n out of N runs) 3585 self.logger.info(
"Processing list of datasets to ignore...")
3587 self.logger.debug(
"Before processing ignore list there are %d " \
3588 "datasets in the list to be processed" % \
3589 len(self.datasets_to_use))
3592 dataset_names_filtered = copy.deepcopy(self.datasets_to_use)
3593 for dataset_name
in self.datasets_to_use.keys():
3594 if dataset_name
in self.datasets_to_ignore.keys():
3595 del dataset_names_filtered[dataset_name]
3597 self.logger.info(
" --> Removed %d dataset(s)" % \
3598 (len(self.datasets_to_use) -
3599 len(dataset_names_filtered)))
3601 self.datasets_to_use = dataset_names_filtered
3603 self.logger.debug(
"After processing ignore list there are %d " \
3604 "datasets in the list to be processed" % \
3605 len(self.datasets_to_use))
3613 self.logger.info(
"Processing list of runs to use and ignore...")
3624 runs_to_use = self.runs_to_use
3625 runs_to_ignore = self.runs_to_ignore
3627 for dataset_name
in self.datasets_to_use:
3628 runs_in_dataset = self.datasets_information[dataset_name][
"runs"]
3631 runs_to_use_tmp = []
3632 for run
in runs_to_use:
3633 if not run
in runs_in_dataset:
3634 self.logger.warning(
"Dataset `%s' does not contain " \
3635 "requested run %d " \
3636 "--> ignoring `use' of this run" % \
3637 (dataset_name, run))
3639 runs_to_use_tmp.append(run)
3641 if len(runs_to_use) > 0:
3642 runs = runs_to_use_tmp
3643 self.logger.info(
"Using %d out of %d runs " \
3644 "of dataset `%s'" % \
3645 (len(runs), len(runs_in_dataset),
3648 runs = runs_in_dataset
3650 if len(runs_to_ignore) > 0:
3653 if not run
in runs_to_ignore:
3654 runs_tmp.append(run)
3655 self.logger.info(
"Ignoring %d out of %d runs " \
3656 "of dataset `%s'" % \
3657 (len(runs)- len(runs_tmp),
3658 len(runs_in_dataset),
3662 if self.todofile !=
"YourToDofile.txt":
3664 print "Reading runs from file /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s" %self.todofile
3665 cmd=
"grep %s /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s | cut -f5 -d' '" %(dataset_name,self.todofile)
3666 (status, output)=commands.getstatusoutput(cmd)
3669 if run_str
in output:
3670 runs_todo.append(run)
3671 self.logger.info(
"Using %d runs " \
3672 "of dataset `%s'" % \
3678 if self.Jsonfilename !=
"YourJSON.txt":
3680 self.Jsonlumi =
True 3683 self.logger.info(
"Reading runs and lumisections from file `%s'" % \
3686 Jsonfile = open(self.Jsonfilename,
"r") 3687 for names
in Jsonfile:
3688 dictNames= eval(
str(names))
3689 for key
in dictNames:
3691 Json_runs.append(intkey)
3694 msg =
"ERROR: Could not open Jsonfile `%s'" % \
3696 self.logger.fatal(msg)
3699 if run
in Json_runs:
3700 good_runs.append(run)
3701 self.logger.info(
"Using %d runs " \
3702 "of dataset `%s'" % \
3706 if (self.Jsonrunfilename !=
"YourJSON.txt")
and (self.Jsonfilename ==
"YourJSON.txt"):
3710 self.logger.info(
"Reading runs from file `%s'" % \
3711 self.Jsonrunfilename)
3713 Jsonfile = open(self.Jsonrunfilename,
"r") 3714 for names
in Jsonfile:
3715 dictNames= eval(
str(names))
3716 for key
in dictNames:
3718 Json_runs.append(intkey)
3721 msg =
"ERROR: Could not open Jsonfile `%s'" % \
3723 self.logger.fatal(msg)
3726 if run
in Json_runs:
3727 good_runs.append(run)
3728 self.logger.info(
"Using %d runs " \
3729 "of dataset `%s'" % \
3734 self.datasets_to_use[dataset_name] = runs
3741 """Remove all but the largest part of all datasets. 3743 This allows us to harvest at least part of these datasets 3744 using single-step harvesting until the two-step approach 3750 assert self.harvesting_mode ==
"single-step-allow-partial" 3753 for dataset_name
in self.datasets_to_use:
3754 for run_number
in self.datasets_information[dataset_name][
"runs"]:
3755 max_events =
max(self.datasets_information[dataset_name][
"sites"][run_number].
values())
3756 sites_with_max_events = [i[0]
for i
in self.datasets_information[dataset_name][
"sites"][run_number].
items()
if i[1] == max_events]
3757 self.logger.warning(
"Singlifying dataset `%s', " \
3759 (dataset_name, run_number))
3760 cmssw_version = self.datasets_information[dataset_name] \
3762 selected_site = self.pick_a_site(sites_with_max_events,
3766 nevents_old = self.datasets_information[dataset_name][
"num_events"][run_number]
3767 self.logger.warning(
" --> " \
3768 "only harvesting partial statistics: " \
3769 "%d out of %d events (5.1%f%%) " \
3773 100. * max_events / nevents_old,
3775 self.logger.warning(
"!!! Please note that the number of " \
3776 "events in the output path name will " \
3777 "NOT reflect the actual statistics in " \
3778 "the harvested results !!!")
3785 self.datasets_information[dataset_name][
"sites"][run_number] = {selected_site: max_events}
3786 self.datasets_information[dataset_name][
"num_events"][run_number] = max_events
3794 """Check list of dataset names for impossible ones. 3796 Two kinds of checks are done: 3797 - Checks for things that do not make sense. These lead to 3798 errors and skipped datasets. 3799 - Sanity checks. For these warnings are issued but the user is 3800 considered to be the authoritative expert. 3803 - The CMSSW version encoded in the dataset name should match 3804 self.cmssw_version. This is critical. 3805 - There should be some events in the dataset/run. This is 3806 critical in the sense that CRAB refuses to create jobs for 3807 zero events. And yes, this does happen in practice. E.g. the 3808 reprocessed CRAFT08 datasets contain runs with zero events. 3809 - A cursory check is performed to see if the harvesting type 3810 makes sense for the data type. This should prevent the user 3811 from inadvertently running RelVal for data. 3812 - It is not possible to run single-step harvesting jobs on 3813 samples that are not fully contained at a single site. 3814 - Each dataset/run has to be available at at least one site. 3818 self.logger.info(
"Performing sanity checks on dataset list...")
3820 dataset_names_after_checks = copy.deepcopy(self.datasets_to_use)
3822 for dataset_name
in self.datasets_to_use.keys():
3825 version_from_dataset = self.datasets_information[dataset_name] \
3827 if version_from_dataset != self.cmssw_version:
3828 msg =
" CMSSW version mismatch for dataset `%s' " \
3831 self.cmssw_version, version_from_dataset)
3832 if self.force_running:
3834 self.logger.warning(
"%s " \
3835 "--> `force mode' active: " \
3838 del dataset_names_after_checks[dataset_name]
3839 self.logger.warning(
"%s " \
3840 "--> skipping" % msg)
3851 datatype = self.datasets_information[dataset_name][
"datatype"]
3852 if datatype ==
"data":
3854 if self.harvesting_type !=
"DQMOffline":
3856 elif datatype ==
"mc":
3857 if self.harvesting_type ==
"DQMOffline":
3861 assert False,
"ERROR Impossible data type `%s' " \
3862 "for dataset `%s'" % \
3863 (datatype, dataset_name)
3865 msg =
" Normally one does not run `%s' harvesting " \
3866 "on %s samples, are you sure?" % \
3867 (self.harvesting_type, datatype)
3868 if self.force_running:
3869 self.logger.warning(
"%s " \
3870 "--> `force mode' active: " \
3873 del dataset_names_after_checks[dataset_name]
3874 self.logger.warning(
"%s " \
3875 "--> skipping" % msg)
3889 if datatype ==
"data":
3890 if self.globaltag
is None:
3891 msg =
"For data datasets (like `%s') " \
3892 "we need a GlobalTag" % \
3894 del dataset_names_after_checks[dataset_name]
3895 self.logger.warning(
"%s " \
3896 "--> skipping" % msg)
3906 globaltag = self.datasets_information[dataset_name][
"globaltag"]
3907 if not globaltag
in self.globaltag_check_cache:
3908 if self.check_globaltag(globaltag):
3909 self.globaltag_check_cache.append(globaltag)
3911 msg =
"Something is wrong with GlobalTag `%s' " \
3912 "used by dataset `%s'!" % \
3913 (globaltag, dataset_name)
3914 if self.use_ref_hists:
3915 msg +=
"\n(Either it does not exist or it " \
3916 "does not contain the required key to " \
3917 "be used with reference histograms.)" 3919 msg +=
"\n(It probably just does not exist.)" 3920 self.logger.fatal(msg)
3926 runs_without_sites = [i
for (i, j)
in \
3927 self.datasets_information[dataset_name] \
3930 i
in self.datasets_to_use[dataset_name]]
3931 if len(runs_without_sites) > 0:
3932 for run_without_sites
in runs_without_sites:
3934 dataset_names_after_checks[dataset_name].
remove(run_without_sites)
3937 self.logger.warning(
" removed %d unavailable run(s) " \
3938 "from dataset `%s'" % \
3939 (len(runs_without_sites), dataset_name))
3940 self.logger.debug(
" (%s)" % \
3942 runs_without_sites]))
3948 if not self.harvesting_mode ==
"two-step":
3949 for run_number
in self.datasets_to_use[dataset_name]:
3954 num_sites = len(self.datasets_information[dataset_name] \
3955 [
"sites"][run_number])
3956 if num_sites > 1
and \
3957 not self.datasets_information[dataset_name] \
3958 [
"mirrored"][run_number]:
3962 msg =
" Dataset `%s', run %d is spread across more " \
3963 "than one site.\n" \
3964 " Cannot run single-step harvesting on " \
3965 "samples spread across multiple sites" % \
3966 (dataset_name, run_number)
3968 dataset_names_after_checks[dataset_name].
remove(run_number)
3971 self.logger.warning(
"%s " \
3972 "--> skipping" % msg)
3981 tmp = [j
for (i, j)
in self.datasets_information \
3982 [dataset_name][
"num_events"].
items() \
3983 if i
in self.datasets_to_use[dataset_name]]
3984 num_events_dataset = sum(tmp)
3986 if num_events_dataset < 1:
3987 msg =
" dataset `%s' is empty" % dataset_name
3988 del dataset_names_after_checks[dataset_name]
3989 self.logger.warning(
"%s " \
3990 "--> skipping" % msg)
4003 self.datasets_information[dataset_name] \
4004 [
"num_events"].
items()
if i[1] < 1]
4005 tmp = [i
for i
in tmp
if i[0]
in self.datasets_to_use[dataset_name]]
4006 empty_runs =
dict(tmp)
4007 if len(empty_runs) > 0:
4008 for empty_run
in empty_runs:
4010 dataset_names_after_checks[dataset_name].
remove(empty_run)
4013 self.logger.info(
" removed %d empty run(s) from dataset `%s'" % \
4014 (len(empty_runs), dataset_name))
4015 self.logger.debug(
" (%s)" % \
4016 ", ".
join([
str(i)
for i
in empty_runs]))
4022 dataset_names_after_checks_tmp = copy.deepcopy(dataset_names_after_checks)
4023 for (dataset_name, runs)
in six.iteritems(dataset_names_after_checks):
4025 self.logger.warning(
" Removing dataset without any runs " \
4028 del dataset_names_after_checks_tmp[dataset_name]
4029 dataset_names_after_checks = dataset_names_after_checks_tmp
4033 self.logger.warning(
" --> Removed %d dataset(s)" % \
4034 (len(self.datasets_to_use) -
4035 len(dataset_names_after_checks)))
4038 self.datasets_to_use = dataset_names_after_checks
4045 """Escape a DBS dataset name. 4047 Escape a DBS dataset name such that it does not cause trouble 4048 with the file system. This means turning each `/' into `__', 4049 except for the first one which is just removed. 4053 escaped_dataset_name = dataset_name
4054 escaped_dataset_name = escaped_dataset_name.strip(
"/")
4055 escaped_dataset_name = escaped_dataset_name.replace(
"/",
"__")
4057 return escaped_dataset_name
4064 """Generate the name of the configuration file to be run by 4067 Depending on the harvesting mode (single-step or two-step) 4068 this is the name of the real harvesting configuration or the 4069 name of the first-step ME summary extraction configuration. 4073 if self.harvesting_mode ==
"single-step":
4074 config_file_name = self.create_harvesting_config_file_name(dataset_name)
4075 elif self.harvesting_mode ==
"single-step-allow-partial":
4076 config_file_name = self.create_harvesting_config_file_name(dataset_name)
4083 elif self.harvesting_mode ==
"two-step":
4084 config_file_name = self.create_me_summary_config_file_name(dataset_name)
4086 assert False,
"ERROR Unknown harvesting mode `%s'" % \
4087 self.harvesting_mode
4090 return config_file_name
4096 "Generate the name to be used for the harvesting config file." 4098 file_name_base =
"harvesting.py" 4099 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4100 config_file_name = file_name_base.replace(
".py",
4102 dataset_name_escaped)
4105 return config_file_name
4110 "Generate the name of the ME summary extraction config file." 4112 file_name_base =
"me_extraction.py" 4113 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4114 config_file_name = file_name_base.replace(
".py",
4116 dataset_name_escaped)
4119 return config_file_name
4124 """Create the name of the output file name to be used. 4126 This is the name of the output file of the `first step'. In 4127 the case of single-step harvesting this is already the final 4128 harvesting output ROOT file. In the case of two-step 4129 harvesting it is the name of the intermediary ME summary 4142 if self.harvesting_mode ==
"single-step":
4144 assert not run_number
is None 4146 output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4147 elif self.harvesting_mode ==
"single-step-allow-partial":
4149 assert not run_number
is None 4151 output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4152 elif self.harvesting_mode ==
"two-step":
4154 assert run_number
is None 4156 output_file_name = self.create_me_summary_output_file_name(dataset_name)
4159 assert False,
"ERROR Unknown harvesting mode `%s'" % \
4160 self.harvesting_mode
4163 return output_file_name
4168 """Generate the name to be used for the harvesting output file. 4170 This harvesting output file is the _final_ ROOT output file 4171 containing the harvesting results. In case of two-step 4172 harvesting there is an intermediate ME output file as well. 4176 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4184 output_file_name =
"DQM_V0001_R%09d__%s.root" % \
4185 (run_number, dataset_name_escaped)
4186 if self.harvesting_mode.find(
"partial") > -1:
4189 if self.datasets_information[dataset_name] \
4190 [
"mirrored"][run_number] ==
False:
4191 output_file_name = output_file_name.replace(
".root", \
4195 return output_file_name
4200 """Generate the name of the intermediate ME file name to be 4201 used in two-step harvesting. 4205 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4206 output_file_name =
"me_summary_%s.root" % \
4207 dataset_name_escaped
4210 return output_file_name
4215 """Create the block name to use for this dataset/run number. 4217 This is what appears in the brackets `[]' in multicrab.cfg. It 4218 is used as the name of the job and to create output 4223 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4224 block_name =
"%s_%09d_%s" % (dataset_name_escaped, run_number, index)
4232 """Create a CRAB configuration for a given job. 4234 NOTE: This is _not_ a complete (as in: submittable) CRAB 4235 configuration. It is used to store the common settings for the 4236 multicrab configuration. 4238 NOTE: Only CERN CASTOR area (/castor/cern.ch/) is supported. 4240 NOTE: According to CRAB, you `Must define exactly two of 4241 total_number_of_events, events_per_job, or 4242 number_of_jobs.'. For single-step harvesting we force one job, 4243 for the rest we don't really care. 4246 # With the current version of CRAB (2.6.1), in which Daniele 4247 # fixed the behaviour of no_block_boundary for me, one _has to 4248 # specify_ the total_number_of_events and one single site in 4249 # the se_white_list. 4257 castor_prefix = self.castor_prefix
4259 tmp.append(self.config_file_header())
4264 tmp.append(
"[CRAB]")
4265 tmp.append(
"jobtype = cmssw")
4270 tmp.append(
"[GRID]")
4271 tmp.append(
"virtual_organization=cms")
4276 tmp.append(
"[USER]")
4277 tmp.append(
"copy_data = 1")
4282 tmp.append(
"[CMSSW]")
4283 tmp.append(
"# This reveals data hosted on T1 sites,")
4284 tmp.append(
"# which is normally hidden by CRAB.")
4285 tmp.append(
"show_prod = 1")
4286 tmp.append(
"number_of_jobs = 1")
4287 if self.Jsonlumi ==
True:
4288 tmp.append(
"lumi_mask = %s" % self.Jsonfilename)
4289 tmp.append(
"total_number_of_lumis = -1")
4291 if self.harvesting_type ==
"DQMOffline":
4292 tmp.append(
"total_number_of_lumis = -1")
4294 tmp.append(
"total_number_of_events = -1")
4295 if self.harvesting_mode.find(
"single-step") > -1:
4296 tmp.append(
"# Force everything to run in one job.")
4297 tmp.append(
"no_block_boundary = 1")
4304 crab_config =
"\n".
join(tmp)
4312 """Create a multicrab.cfg file for all samples. 4314 This creates the contents for a multicrab.cfg file that uses 4315 the crab.cfg file (generated elsewhere) for the basic settings 4316 and contains blocks for each run of each dataset. 4319 # The fact that it's necessary to specify the se_white_list 4320 # and the total_number_of_events is due to our use of CRAB 4321 # version 2.6.1. This should no longer be necessary in the 4327 cmd=
"who i am | cut -f1 -d' '" 4328 (status, output)=commands.getstatusoutput(cmd)
4331 if self.caf_access ==
True:
4332 print "Extracting %s as user name" %UserName
4334 number_max_sites = self.nr_max_sites + 1
4336 multicrab_config_lines = []
4337 multicrab_config_lines.append(self.config_file_header())
4338 multicrab_config_lines.append(
"")
4339 multicrab_config_lines.append(
"[MULTICRAB]")
4340 multicrab_config_lines.append(
"cfg = crab.cfg")
4341 multicrab_config_lines.append(
"")
4343 dataset_names = sorted(self.datasets_to_use.keys())
4345 for dataset_name
in dataset_names:
4346 runs = self.datasets_to_use[dataset_name]
4347 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4348 castor_prefix = self.castor_prefix
4353 castor_dir = self.datasets_information[dataset_name] \
4354 [
"castor_path"][run]
4356 cmd =
"rfdir %s" % castor_dir
4357 (status, output) = commands.getstatusoutput(cmd)
4359 if len(output) <= 0:
4365 assert (len(self.datasets_information[dataset_name] \
4366 [
"sites"][run]) == 1)
or \
4367 self.datasets_information[dataset_name][
"mirrored"]
4370 site_names = self.datasets_information[dataset_name] \
4371 [
"sites"][run].
keys()
4373 for i
in range(1, number_max_sites, 1):
4374 if len(site_names) > 0:
4375 index =
"site_%02d" % (i)
4377 config_file_name = self. \
4379 output_file_name = self. \
4390 if len(site_names) > 1:
4391 cmssw_version = self.datasets_information[dataset_name] \
4393 self.logger.info(
"Picking site for mirrored dataset " \
4395 (dataset_name, run))
4396 site_name = self.pick_a_site(site_names, cmssw_version)
4397 if site_name
in site_names:
4398 site_names.remove(site_name)
4401 site_name = site_names[0]
4402 site_names.remove(site_name)
4404 if site_name
is self.no_matching_site_found_str:
4408 nevents = self.datasets_information[dataset_name][
"num_events"][run]
4411 multicrab_block_name = self.create_multicrab_block_name( \
4412 dataset_name, run, index)
4413 multicrab_config_lines.append(
"[%s]" % \
4414 multicrab_block_name)
4418 if site_name ==
"caf.cern.ch":
4419 multicrab_config_lines.append(
"CRAB.use_server=0")
4420 multicrab_config_lines.append(
"CRAB.scheduler=caf")
4422 multicrab_config_lines.append(
"scheduler = glite")
4426 if site_name ==
"caf.cern.ch":
4429 multicrab_config_lines.append(
"GRID.se_white_list = %s" % \
4431 multicrab_config_lines.append(
"# This removes the default blacklisting of T1 sites.")
4432 multicrab_config_lines.append(
"GRID.remove_default_blacklist = 1")
4433 multicrab_config_lines.append(
"GRID.rb = CERN")
4434 if not self.non_t1access:
4435 multicrab_config_lines.append(
"GRID.role = t1access")
4440 castor_dir = castor_dir.replace(castor_prefix,
"")
4441 multicrab_config_lines.append(
"USER.storage_element=srm-cms.cern.ch")
4442 multicrab_config_lines.append(
"USER.user_remote_dir = %s" % \
4444 multicrab_config_lines.append(
"USER.check_user_remote_dir=0")
4446 if site_name ==
"caf.cern.ch":
4447 multicrab_config_lines.append(
"USER.storage_path=%s" % castor_prefix)
4453 multicrab_config_lines.append(
"USER.storage_path=/srm/managerv2?SFN=%s" % castor_prefix)
4460 multicrab_config_lines.append(
"CMSSW.pset = %s" % \
4462 multicrab_config_lines.append(
"CMSSW.datasetpath = %s" % \
4464 multicrab_config_lines.append(
"CMSSW.runselection = %d" % \
4467 if self.Jsonlumi ==
True:
4470 if self.harvesting_type ==
"DQMOffline":
4473 multicrab_config_lines.append(
"CMSSW.total_number_of_events = %d" % \
4476 multicrab_config_lines.append(
"CMSSW.output_file = %s" % \
4481 if site_name ==
"caf.cern.ch":
4482 multicrab_config_lines.append(
"CAF.queue=cmscaf1nd")
4486 multicrab_config_lines.append(
"")
4490 self.all_sites_found =
True 4492 multicrab_config =
"\n".
join(multicrab_config_lines)
4495 return multicrab_config
4500 """Check if globaltag exists. 4502 Check if globaltag exists as GlobalTag in the database given 4503 by self.frontier_connection_name['globaltag']. If globaltag is 4504 None, self.globaltag is used instead. 4506 If we're going to use reference histograms this method also 4507 checks for the existence of the required key in the GlobalTag. 4511 if globaltag
is None:
4512 globaltag = self.globaltag
4515 if globaltag.endswith(
"::All"):
4516 globaltag = globaltag[:-5]
4518 connect_name = self.frontier_connection_name[
"globaltag"]
4526 connect_name = connect_name.replace(
"frontier://",
4527 "frontier://cmsfrontier:8000/")
4529 connect_name += self.db_account_name_cms_cond_globaltag()
4531 tag_exists = self.check_globaltag_exists(globaltag, connect_name)
4535 tag_contains_ref_hist_key =
False 4536 if self.use_ref_hists
and tag_exists:
4538 tag_contains_ref_hist_key = self.check_globaltag_contains_ref_hist_key(globaltag, connect_name)
4542 if self.use_ref_hists:
4543 ret_val = tag_exists
and tag_contains_ref_hist_key
4545 ret_val = tag_exists
4555 """Check if globaltag exists. 4559 self.logger.info(
"Checking existence of GlobalTag `%s'" % \
4561 self.logger.debug(
" (Using database connection `%s')" % \
4564 cmd =
"cmscond_tagtree_list -c %s -T %s" % \
4565 (connect_name, globaltag)
4566 (status, output) = commands.getstatusoutput(cmd)
4568 output.find(
"error") > -1:
4569 msg =
"Could not check existence of GlobalTag `%s' in `%s'" % \
4570 (globaltag, connect_name)
4571 if output.find(
".ALL_TABLES not found") > -1:
4573 "Missing database account `%s'" % \
4574 (msg, output.split(
".ALL_TABLES")[0].
split()[-1])
4575 self.logger.fatal(msg)
4576 self.logger.debug(
"Command used:")
4577 self.logger.debug(
" %s" % cmd)
4578 self.logger.debug(
"Output received:")
4579 self.logger.debug(output)
4581 if output.find(
"does not exist") > -1:
4582 self.logger.debug(
"GlobalTag `%s' does not exist in `%s':" % \
4583 (globaltag, connect_name))
4584 self.logger.debug(
"Output received:")
4585 self.logger.debug(output)
4589 self.logger.info(
" GlobalTag exists? -> %s" % tag_exists)
4597 """Check if globaltag contains the required RefHistos key. 4602 tag_contains_key =
None 4603 ref_hist_key =
"RefHistos" 4604 self.logger.info(
"Checking existence of reference " \
4605 "histogram key `%s' in GlobalTag `%s'" % \
4606 (ref_hist_key, globaltag))
4607 self.logger.debug(
" (Using database connection `%s')" % \
4609 cmd =
"cmscond_tagtree_list -c %s -T %s -n %s" % \
4610 (connect_name, globaltag, ref_hist_key)
4611 (status, output) = commands.getstatusoutput(cmd)
4613 output.find(
"error") > -1:
4614 msg =
"Could not check existence of key `%s'" % \
4615 (ref_hist_key, connect_name)
4616 self.logger.fatal(msg)
4617 self.logger.debug(
"Command used:")
4618 self.logger.debug(
" %s" % cmd)
4619 self.logger.debug(
"Output received:")
4620 self.logger.debug(
" %s" % output)
4623 self.logger.debug(
"Required key for use of reference " \
4624 "histograms `%s' does not exist " \
4625 "in GlobalTag `%s':" % \
4626 (ref_hist_key, globaltag))
4627 self.logger.debug(
"Output received:")
4628 self.logger.debug(output)
4629 tag_contains_key =
False 4631 tag_contains_key =
True 4633 self.logger.info(
" GlobalTag contains `%s' key? -> %s" % \
4634 (ref_hist_key, tag_contains_key))
4637 return tag_contains_key
4642 """Check the existence of tag_name in database connect_name. 4644 Check if tag_name exists as a reference histogram tag in the 4645 database given by self.frontier_connection_name['refhists']. 4649 connect_name = self.frontier_connection_name[
"refhists"]
4650 connect_name += self.db_account_name_cms_cond_dqm_summary()
4652 self.logger.debug(
"Checking existence of reference " \
4653 "histogram tag `%s'" % \
4655 self.logger.debug(
" (Using database connection `%s')" % \
4658 cmd =
"cmscond_list_iov -c %s" % \
4660 (status, output) = commands.getstatusoutput(cmd)
4662 msg =
"Could not check existence of tag `%s' in `%s'" % \
4663 (tag_name, connect_name)
4664 self.logger.fatal(msg)
4665 self.logger.debug(
"Command used:")
4666 self.logger.debug(
" %s" % cmd)
4667 self.logger.debug(
"Output received:")
4668 self.logger.debug(output)
4670 if not tag_name
in output.split():
4671 self.logger.debug(
"Reference histogram tag `%s' " \
4672 "does not exist in `%s'" % \
4673 (tag_name, connect_name))
4674 self.logger.debug(
" Existing tags: `%s'" % \
4675 "', `".
join(output.split()))
4679 self.logger.debug(
" Reference histogram tag exists? " \
4680 "-> %s" % tag_exists)
4688 """Build the es_prefer snippet for the reference histograms. 4690 The building of the snippet is wrapped in some care-taking 4691 code that figures out the name of the reference histogram set 4692 and makes sure the corresponding tag exists. 4698 ref_hist_tag_name = self.ref_hist_mappings[dataset_name]
4700 connect_name = self.frontier_connection_name[
"refhists"]
4701 connect_name += self.db_account_name_cms_cond_dqm_summary()
4702 record_name =
"DQMReferenceHistogramRootFileRcd" 4706 code_lines.append(
"from CondCore.DBCommon.CondDBSetup_cfi import *")
4707 code_lines.append(
"process.ref_hist_source = cms.ESSource(\"PoolDBESSource\", CondDBSetup,")
4708 code_lines.append(
" connect = cms.string(\"%s\")," % connect_name)
4709 code_lines.append(
" toGet = cms.VPSet(cms.PSet(record = cms.string(\"%s\")," % record_name)
4710 code_lines.append(
" tag = cms.string(\"%s\"))," % ref_hist_tag_name)
4711 code_lines.append(
" )")
4712 code_lines.append(
" )")
4713 code_lines.append(
"process.es_prefer_ref_hist_source = cms.ESPrefer(\"PoolDBESSource\", \"ref_hist_source\")")
4715 snippet =
"\n".
join(code_lines)
4723 """Create the Python harvesting configuration for harvesting. 4725 The basic configuration is created by 4726 Configuration.PyReleaseValidation.ConfigBuilder. (This mimics 4727 what cmsDriver.py does.) After that we add some specials 4730 NOTE: On one hand it may not be nice to circumvent 4731 cmsDriver.py, on the other hand cmsDriver.py does not really 4732 do anything itself. All the real work is done by the 4733 ConfigBuilder so there is not much risk that we miss out on 4734 essential developments of cmsDriver in the future. 4739 config_options = defaultOptions
4744 config_options.name =
"harvesting" 4745 config_options.scenario =
"pp" 4746 config_options.number = 1
4747 config_options.arguments = self.ident_string()
4748 config_options.evt_type = config_options.name
4749 config_options.customisation_file =
None 4750 config_options.filein =
"dummy_value" 4751 config_options.filetype =
"EDM" 4753 config_options.gflash =
"dummy_value" 4757 config_options.dbsquery =
"" 4764 config_options.step =
"HARVESTING:%s" % \
4765 self.harvesting_info[self.harvesting_type] \
4767 config_options.beamspot = self.harvesting_info[self.harvesting_type] \
4769 config_options.eventcontent = self.harvesting_info \
4770 [self.harvesting_type] \
4772 config_options.harvesting = self.harvesting_info \
4773 [self.harvesting_type] \
4780 datatype = self.datasets_information[dataset_name][
"datatype"]
4781 config_options.isMC = (datatype.lower() ==
"mc")
4782 config_options.isData = (datatype.lower() ==
"data")
4783 globaltag = self.datasets_information[dataset_name][
"globaltag"]
4785 config_options.conditions = self.format_conditions_string(globaltag)
4789 if "with_input" in getargspec(ConfigBuilder.__init__)[0]:
4791 config_builder =
ConfigBuilder(config_options, with_input=
True)
4795 config_builder.prepare(
True)
4796 config_contents = config_builder.pythonCfgCode
4805 marker_lines.append(sep)
4806 marker_lines.append(
"# Code between these markers was generated by")
4807 marker_lines.append(
"# Configuration.PyReleaseValidation." \
4810 marker_lines.append(sep)
4811 marker =
"\n".
join(marker_lines)
4813 tmp = [self.config_file_header()]
4817 tmp.append(config_contents)
4821 config_contents =
"\n".
join(tmp)
4826 customisations = [
""]
4828 customisations.append(
"# Now follow some customisations")
4829 customisations.append(
"")
4830 connect_name = self.frontier_connection_name[
"globaltag"]
4831 connect_name += self.db_account_name_cms_cond_globaltag()
4832 customisations.append(
"process.GlobalTag.connect = \"%s\"" % \
4836 if self.saveByLumiSection ==
True:
4837 customisations.append(
"process.dqmSaver.saveByLumiSection = 1")
4841 customisations.append(
"")
4855 use_es_prefer = (self.harvesting_type ==
"RelVal")
4856 use_refs = use_es_prefer
or \
4857 (
not self.harvesting_type ==
"MC")
4859 use_refs = use_refs
and self.use_ref_hists
4867 customisations.append(
"print \"Not using reference histograms\"")
4868 customisations.append(
"if hasattr(process, \"dqmRefHistoRootFileGetter\"):")
4869 customisations.append(
" for (sequence_name, sequence) in six.iteritems(process.sequences):")
4870 customisations.append(
" if sequence.remove(process.dqmRefHistoRootFileGetter):")
4871 customisations.append(
" print \"Removed process.dqmRefHistoRootFileGetter from sequence `%s'\" % \\")
4872 customisations.append(
" sequence_name")
4873 customisations.append(
"process.dqmSaver.referenceHandling = \"skip\"")
4877 customisations.append(
"process.dqmSaver.referenceHandling = \"all\"")
4879 es_prefer_snippet = self.create_es_prefer_snippet(dataset_name)
4880 customisations.append(es_prefer_snippet)
4884 workflow_name = dataset_name
4885 if self.harvesting_mode ==
"single-step-allow-partial":
4886 workflow_name +=
"_partial" 4887 customisations.append(
"process.dqmSaver.workflow = \"%s\"" % \
4924 config_contents = config_contents +
"\n".
join(customisations)
4929 return config_contents
4956 tmp.append(self.config_file_header())
4958 tmp.append(
"import FWCore.ParameterSet.Config as cms")
4960 tmp.append(
"process = cms.Process(\"ME2EDM\")")
4962 tmp.append(
"# Import of standard configurations")
4963 tmp.append(
"process.load(\"Configuration/EventContent/EventContent_cff\")")
4965 tmp.append(
"# We don't really process any events, just keep this set to one to")
4966 tmp.append(
"# make sure things work.")
4967 tmp.append(
"process.maxEvents = cms.untracked.PSet(")
4968 tmp.append(
" input = cms.untracked.int32(1)")
4971 tmp.append(
"process.options = cms.untracked.PSet(")
4972 tmp.append(
" Rethrow = cms.untracked.vstring(\"ProductNotFound\")")
4975 tmp.append(
"process.source = cms.Source(\"PoolSource\",")
4976 tmp.append(
" processingMode = \\")
4977 tmp.append(
" cms.untracked.string(\"RunsAndLumis\"),")
4978 tmp.append(
" fileNames = \\")
4979 tmp.append(
" cms.untracked.vstring(\"no_file_specified\")")
4982 tmp.append(
"# Output definition: drop everything except for the monitoring.")
4983 tmp.append(
"process.output = cms.OutputModule(")
4984 tmp.append(
" \"PoolOutputModule\",")
4985 tmp.append(
" outputCommands = \\")
4986 tmp.append(
" cms.untracked.vstring(\"drop *\", \\")
4987 tmp.append(
" \"keep *_MEtoEDMConverter_*_*\"),")
4988 output_file_name = self. \
4990 tmp.append(
" fileName = \\")
4991 tmp.append(
" cms.untracked.string(\"%s\")," % output_file_name)
4992 tmp.append(
" dataset = cms.untracked.PSet(")
4993 tmp.append(
" dataTier = cms.untracked.string(\"RECO\"),")
4994 tmp.append(
" filterName = cms.untracked.string(\"\")")
4998 tmp.append(
"# Additional output definition")
4999 tmp.append(
"process.out_step = cms.EndPath(process.output)")
5001 tmp.append(
"# Schedule definition")
5002 tmp.append(
"process.schedule = cms.Schedule(process.out_step)")
5005 config_contents =
"\n".
join(tmp)
5008 return config_contents
5046 """Write a CRAB job configuration Python file. 5050 self.logger.info(
"Writing CRAB configuration...")
5052 file_name_base =
"crab.cfg" 5055 crab_contents = self.create_crab_config()
5058 crab_file_name = file_name_base
5060 crab_file =
file(crab_file_name,
"w")
5061 crab_file.write(crab_contents)
5064 self.logger.fatal(
"Could not write " \
5065 "CRAB configuration to file `%s'" % \
5067 raise Error(
"ERROR: Could not write to file `%s'!" % \
5075 """Write a multi-CRAB job configuration Python file. 5079 self.logger.info(
"Writing multi-CRAB configuration...")
5081 file_name_base =
"multicrab.cfg" 5084 multicrab_contents = self.create_multicrab_config()
5087 multicrab_file_name = file_name_base
5089 multicrab_file =
file(multicrab_file_name,
"w")
5090 multicrab_file.write(multicrab_contents)
5091 multicrab_file.close()
5093 self.logger.fatal(
"Could not write " \
5094 "multi-CRAB configuration to file `%s'" % \
5095 multicrab_file_name)
5096 raise Error(
"ERROR: Could not write to file `%s'!" % \
5097 multicrab_file_name)
5104 """Write a harvesting job configuration Python file. 5106 NOTE: This knows nothing about single-step or two-step 5107 harvesting. That's all taken care of by 5108 create_harvesting_config. 5112 self.logger.debug(
"Writing harvesting configuration for `%s'..." % \
5116 config_contents = self.create_harvesting_config(dataset_name)
5119 config_file_name = self. \
5122 config_file =
file(config_file_name,
"w")
5123 config_file.write(config_contents)
5126 self.logger.fatal(
"Could not write " \
5127 "harvesting configuration to file `%s'" % \
5129 raise Error(
"ERROR: Could not write to file `%s'!" % \
5137 """Write an ME-extraction configuration Python file. 5139 This `ME-extraction' (ME = Monitoring Element) is the first 5140 step of the two-step harvesting. 5144 self.logger.debug(
"Writing ME-extraction configuration for `%s'..." % \
5148 config_contents = self.create_me_extraction_config(dataset_name)
5151 config_file_name = self. \
5154 config_file =
file(config_file_name,
"w")
5155 config_file.write(config_contents)
5158 self.logger.fatal(
"Could not write " \
5159 "ME-extraction configuration to file `%s'" % \
5161 raise Error(
"ERROR: Could not write to file `%s'!" % \
5170 """Check if we need to load and check the reference mappings. 5172 For data the reference histograms should be taken 5173 automatically from the GlobalTag, so we don't need any 5174 mappings. For RelVals we need to know a mapping to be used in 5175 the es_prefer code snippet (different references for each of 5178 WARNING: This implementation is a bit convoluted. 5184 if not dataset_name
is None:
5185 data_type = self.datasets_information[dataset_name] \
5187 mappings_needed = (data_type ==
"mc")
5189 if not mappings_needed:
5190 assert data_type ==
"data" 5193 tmp = [self.ref_hist_mappings_needed(dataset_name) \
5194 for dataset_name
in \
5195 self.datasets_information.keys()]
5196 mappings_needed = (
True in tmp)
5199 return mappings_needed
5204 """Load the reference histogram mappings from file. 5206 The dataset name to reference histogram name mappings are read 5207 from a text file specified in self.ref_hist_mappings_file_name. 5212 assert len(self.ref_hist_mappings) < 1, \
5213 "ERROR Should not be RE-loading " \
5214 "reference histogram mappings!" 5217 self.logger.info(
"Loading reference histogram mappings " \
5218 "from file `%s'" % \
5219 self.ref_hist_mappings_file_name)
5221 mappings_lines =
None 5223 mappings_file =
file(self.ref_hist_mappings_file_name,
"r") 5224 mappings_lines = mappings_file.readlines() 5225 mappings_file.close() 5227 msg =
"ERROR: Could not open reference histogram mapping "\
5228 "file `%s'" % self.ref_hist_mappings_file_name
5229 self.logger.fatal(msg)
5239 for mapping
in mappings_lines:
5241 if not mapping.startswith(
"#"):
5242 mapping = mapping.strip()
5243 if len(mapping) > 0:
5244 mapping_pieces = mapping.split()
5245 if len(mapping_pieces) != 2:
5246 msg =
"ERROR: The reference histogram mapping " \
5247 "file contains a line I don't " \
5248 "understand:\n %s" % mapping
5249 self.logger.fatal(msg)
5251 dataset_name = mapping_pieces[0].
strip()
5252 ref_hist_name = mapping_pieces[1].
strip()
5256 if dataset_name
in self.ref_hist_mappings:
5257 msg =
"ERROR: The reference histogram mapping " \
5258 "file contains multiple mappings for " \
5260 self.logger.fatal(msg)
5264 self.ref_hist_mappings[dataset_name] = ref_hist_name
5268 self.logger.info(
" Successfully loaded %d mapping(s)" % \
5269 len(self.ref_hist_mappings))
5270 max_len =
max([len(i)
for i
in self.ref_hist_mappings.keys()])
5271 for (map_from, map_to)
in six.iteritems(self.ref_hist_mappings):
5272 self.logger.info(
" %-*s -> %s" % \
5273 (max_len, map_from, map_to))
5280 """Make sure all necessary reference histograms exist. 5282 Check that for each of the datasets to be processed a 5283 reference histogram is specified and that that histogram 5284 exists in the database. 5286 NOTE: There's a little complication here. Since this whole 5287 thing was designed to allow (in principle) harvesting of both 5288 data and MC datasets in one go, we need to be careful to check 5289 the availability fof reference mappings only for those 5290 datasets that need it. 5294 self.logger.info(
"Checking reference histogram mappings")
5296 for dataset_name
in self.datasets_to_use:
5298 ref_hist_name = self.ref_hist_mappings[dataset_name]
5300 msg =
"ERROR: No reference histogram mapping found " \
5301 "for dataset `%s'" % \
5303 self.logger.fatal(msg)
5306 if not self.check_ref_hist_tag(ref_hist_name):
5307 msg =
"Reference histogram tag `%s' " \
5308 "(used for dataset `%s') does not exist!" % \
5309 (ref_hist_name, dataset_name)
5310 self.logger.fatal(msg)
5313 self.logger.info(
" Done checking reference histogram mappings.")
5320 """Obtain all information on the datasets that we need to run. 5322 Use DBS to figure out all required information on our 5323 datasets, like the run numbers and the GlobalTag. All 5324 information is stored in the datasets_information member 5339 self.datasets_information = {}
5340 self.logger.info(
"Collecting information for all datasets to process")
5341 dataset_names = sorted(self.datasets_to_use.keys())
5342 for dataset_name
in dataset_names:
5346 self.logger.info(sep_line)
5347 self.logger.info(
" `%s'" % dataset_name)
5348 self.logger.info(sep_line)
5350 runs = self.dbs_resolve_runs(dataset_name)
5351 self.logger.info(
" found %d run(s)" % len(runs))
5353 self.logger.debug(
" run number(s): %s" % \
5354 ", ".
join([
str(i)
for i
in runs]))
5358 self.logger.warning(
" --> skipping dataset " 5360 assert False,
"Panic: found a dataset without runs " \
5364 cmssw_version = self.dbs_resolve_cmssw_version(dataset_name)
5365 self.logger.info(
" found CMSSW version `%s'" % cmssw_version)
5368 datatype = self.dbs_resolve_datatype(dataset_name)
5369 self.logger.info(
" sample is data or MC? --> %s" % \
5375 if self.globaltag
is None:
5376 globaltag = self.dbs_resolve_globaltag(dataset_name)
5378 globaltag = self.globaltag
5380 self.logger.info(
" found GlobalTag `%s'" % globaltag)
5386 assert datatype ==
"data", \
5387 "ERROR Empty GlobalTag for MC dataset!!!" 5395 sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5399 for run_number
in sites_catalog.keys():
5400 num_events[run_number] = sites_catalog \
5401 [run_number][
"all_sites"]
5402 del sites_catalog[run_number][
"all_sites"]
5407 for run_number
in sites_catalog.keys():
5408 mirror_catalog[run_number] = sites_catalog \
5409 [run_number][
"mirrored"]
5410 del sites_catalog[run_number][
"mirrored"]
5439 self.datasets_information[dataset_name] = {}
5440 self.datasets_information[dataset_name][
"runs"] = runs
5441 self.datasets_information[dataset_name][
"cmssw_version"] = \
5443 self.datasets_information[dataset_name][
"globaltag"] = globaltag
5444 self.datasets_information[dataset_name][
"datatype"] = datatype
5445 self.datasets_information[dataset_name][
"num_events"] = num_events
5446 self.datasets_information[dataset_name][
"mirrored"] = mirror_catalog
5447 self.datasets_information[dataset_name][
"sites"] = sites_catalog
5451 castor_path_common = self.create_castor_path_name_common(dataset_name)
5452 self.logger.info(
" output will go into `%s'" % \
5456 [self.create_castor_path_name_special(dataset_name, i, castor_path_common) \
5458 for path_name
in castor_paths.values():
5459 self.logger.debug(
" %s" % path_name)
5460 self.datasets_information[dataset_name][
"castor_path"] = \
5468 """Tell the user what to do now, after this part is done. 5470 This should provide the user with some (preferably 5471 copy-pasteable) instructions on what to do now with the setups 5472 and files that have been created. 5482 self.logger.info(
"")
5483 self.logger.info(sep_line)
5484 self.logger.info(
" Configuration files have been created.")
5485 self.logger.info(
" From here on please follow the usual CRAB instructions.")
5486 self.logger.info(
" Quick copy-paste instructions are shown below.")
5487 self.logger.info(sep_line)
5489 self.logger.info(
"")
5490 self.logger.info(
" Create all CRAB jobs:")
5491 self.logger.info(
" multicrab -create")
5492 self.logger.info(
"")
5493 self.logger.info(
" Submit all CRAB jobs:")
5494 self.logger.info(
" multicrab -submit")
5495 self.logger.info(
"")
5496 self.logger.info(
" Check CRAB status:")
5497 self.logger.info(
" multicrab -status")
5498 self.logger.info(
"")
5500 self.logger.info(
"")
5501 self.logger.info(
" For more information please see the CMS Twiki:")
5502 self.logger.info(
" %s" % twiki_url)
5503 self.logger.info(sep_line)
5507 if not self.all_sites_found:
5508 self.logger.warning(
" For some of the jobs no matching " \
5509 "site could be found")
5510 self.logger.warning(
" --> please scan your multicrab.cfg" \
5511 "for occurrences of `%s'." % \
5512 self.no_matching_site_found_str)
5513 self.logger.warning(
" You will have to fix those " \
5521 "Main entry point of the CMS harvester." 5531 self.parse_cmd_line_options()
5533 self.check_input_status()
5546 self.setup_harvesting_info()
5549 self.build_dataset_use_list()
5551 self.build_dataset_ignore_list()
5554 self.build_runs_use_list()
5555 self.build_runs_ignore_list()
5562 self.process_dataset_ignore_list()
5566 self.build_datasets_information()
5568 if self.use_ref_hists
and \
5569 self.ref_hist_mappings_needed():
5572 self.load_ref_hist_mappings()
5576 self.check_ref_hist_mappings()
5578 self.logger.info(
"No need to load reference " \
5579 "histogram mappings file")
5594 self.process_runs_use_and_ignore_lists()
5599 if self.harvesting_mode ==
"single-step-allow-partial":
5600 self.singlify_datasets()
5603 self.check_dataset_list()
5605 if len(self.datasets_to_use) < 1:
5606 self.logger.info(
"After all checks etc. " \
5607 "there are no datasets (left?) " \
5611 self.logger.info(
"After all checks etc. we are left " \
5612 "with %d dataset(s) to process " \
5613 "for a total of %d runs" % \
5614 (len(self.datasets_to_use),
5615 sum([len(i)
for i
in \
5616 self.datasets_to_use.values()])))
5643 self.create_and_check_castor_dirs()
5647 self.write_crab_config()
5648 self.write_multicrab_config()
5658 for dataset_name
in self.datasets_to_use.keys():
5660 self.write_harvesting_config(dataset_name)
5661 if self.harvesting_mode ==
"two-step":
5662 self.write_me_extraction_config(dataset_name)
5670 for run_number
in self.datasets_to_use[dataset_name]:
5671 tmp[run_number] = self.datasets_information \
5672 [dataset_name][
"num_events"][run_number]
5673 if dataset_name
in self.book_keeping_information:
5674 self.book_keeping_information[dataset_name].
update(tmp)
5676 self.book_keeping_information[dataset_name] = tmp
5679 self.show_exit_message()
5681 except Usage
as err:
5686 except Error
as err:
5690 except Exception
as err:
5699 if isinstance(err, SystemExit):
5700 self.logger.fatal(err.code)
5701 elif not isinstance(err, KeyboardInterrupt):
5702 self.logger.fatal(
"!" * 50)
5703 self.logger.fatal(
" This looks like a serious problem.")
5704 self.logger.fatal(
" If you are sure you followed all " \
5706 self.logger.fatal(
" please copy the below stack trace together")
5707 self.logger.fatal(
" with a description of what you were doing to")
5708 self.logger.fatal(
" jeroen.hegeman@cern.ch.")
5709 self.logger.fatal(
" %s" % self.ident_string())
5710 self.logger.fatal(
"!" * 50)
5711 self.logger.fatal(
str(err))
5713 traceback_string = traceback.format_exc()
5714 for line
in traceback_string.split(
"\n"):
5715 self.logger.fatal(line)
5716 self.logger.fatal(
"!" * 50)
5731 if self.crab_submission ==
True:
5732 os.system(
"multicrab -create")
5733 os.system(
"multicrab -submit")
5744 if __name__ ==
"__main__":
5745 "Main entry point for harvesting." def create_crab_config(self)
def option_handler_harvesting_mode(self, option, opt_str, value, parser)
def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser)
def write_harvesting_config(self, dataset_name)
def option_handler_saveByLumiSection(self, option, opt_str, value, parser)
def option_handler_sites(self, option, opt_str, value, parser)
def write_me_extraction_config(self, dataset_name)
def process_runs_use_and_ignore_lists(self)
def build_dataset_ignore_list(self)
def escape_dataset_name(self, dataset_name)
if self.datasets_information[dataset_name]["num_events"][run_number] != 0: pdb.set_trace() DEBUG DEBU...
def startElement(self, name, attrs)
def singlify_datasets(self)
def option_handler_castor_dir(self, option, opt_str, value, parser)
def option_handler_dataset_name(self, option, opt_str, value, parser): """Specify the name(s) of the ...
def option_handler_force(self, option, opt_str, value, parser)
def option_handler_caf_access(self, option, opt_str, value, parser)
def option_handler_frontier_connection(self, option, opt_str, value, parser)
def option_handler_crab_submission(self, option, opt_str, value, parser)
def create_harvesting_config(self, dataset_name)
def create_castor_path_name_special(self, dataset_name, run_number, castor_path_common)
def option_handler_harvesting_type(self, option, opt_str, value, parser)
def write_crab_config(self)
def create_harvesting_config(self, dataset_name): """Create the Python harvesting configuration for a...
def replace(string, replacements)
def write_multicrab_config(self)
def build_runs_ignore_list(self)
def current_element(self)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
def option_handler_preferred_site(self, option, opt_str, value, parser)
def option_handler_quiet(self, option, opt_str, value, parser)
frontier_connection_overridden
def create_castor_path_name_common(self, dataset_name)
def show_exit_message(self)
DEBUG DEBUG DEBUGThis is probably only useful to make sure we don't muckthings up, right?Figure out across how many sites this sample has been spread.
Helper class: Error exception.
def check_globaltag(self, globaltag=None)
CRAB
def ref_hist_mappings_needed(self, dataset_name=None)
def dbs_resolve_runs(self, dataset_name)
def dbs_resolve_dataset_number_of_events(self, dataset_name): """Ask DBS across how many events this ...
Helper class: Usage exception.
def option_handler_no_ref_hists(self, option, opt_str, value, parser)
def create_me_extraction_config(self, dataset_name)
In case this file is the second step (the real harvestingstep) of the two-step harvesting we have to ...
def create_me_summary_config_file_name(self, dataset_name)
def config_file_header(self)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def build_datasets_information(self)
def check_globaltag_exists(self, globaltag, connect_name)
def check_ref_hist_mappings(self)
def check_ref_hist_tag(self, tag_name)
def dbs_resolve_datatype(self, dataset_name)
def create_multicrab_block_name(self, dataset_name, run_number, index)
def __init__(self, cmd_line_opts=None)
def __init__(self, tag_names)
def setup_harvesting_info(self)
Helper class: DBSXMLHandler.
def option_handler_list_types(self, option, opt_str, value, parser)
def create_output_file_name(self, dataset_name, run_number=None)
def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser)
def option_handler_book_keeping_file(self, option, opt_str, value, parser)
def pick_a_site(self, sites, cmssw_version)
self.logger.debug("Checking CASTOR path piece `%s'" % \ piece)
def create_and_check_castor_dir(self, castor_dir)
def dbs_resolve_cmssw_version(self, dataset_name)
def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name)
def dbs_resolve_number_of_events(self, dataset_name, run_number=None)
def set_output_level(self, output_level)
static std::string join(char **cmd)
def load_ref_hist_mappings(self)
def parse_cmd_line_options(self)
def option_handler_input_Jsonfile(self, option, opt_str, value, parser)
def option_handler_input_todofile(self, option, opt_str, value, parser)
def check_results_validity(self)
def option_handler_input_spec(self, option, opt_str, value, parser)
def create_es_prefer_snippet(self, dataset_name)
def option_handler_debug(self, option, opt_str, value, parser)
def remove(d, key, TELL=False)
def dbs_resolve_globaltag(self, dataset_name)
def format_conditions_string(self, globaltag)
def build_runs_list(self, input_method, input_name)
def check_dataset_list(self)
def create_harvesting_config_file_name(self, dataset_name)
Only add the alarming piece to the file name if this isa spread-out dataset.
def dbs_check_dataset_spread(self, dataset_name)
def dbs_resolve_dataset_number_of_sites(self, dataset_name): """Ask DBS across how many sites this da...
def build_dataset_use_list(self)
def create_me_summary_output_file_name(self, dataset_name)
def characters(self, content)
def db_account_name_cms_cond_globaltag(self)
def option_handler_no_t1access(self, option, opt_str, value, parser)
def option_handler_globaltag(self, option, opt_str, value, parser)
def db_account_name_cms_cond_dqm_summary(self)
def check_input_status(self)
def dbs_resolve_dataset_name(self, dataset_name)
def create_config_file_name(self, dataset_name, run_number)
def setup_dbs(self)
Now we try to do a very simple DBS search.
def build_runs_use_list(self)
def create_multicrab_config(self)
CRAB
def create_and_check_castor_dirs(self)
def create_harvesting_output_file_name(self, dataset_name, run_number)
def endElement(self, name)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
def process_dataset_ignore_list(self)
ref_hist_mappings_file_name
def build_dataset_list(self, input_method, input_name)
class Handler(xml.sax.handler.ContentHandler): def startElement(self, name, attrs): if name == "resul...