15 """Main program to run all kinds of harvesting. 17 These are the basic kinds of harvesting implemented (contact me if 18 your favourite is missing): 20 - RelVal : Run for release validation samples. Makes heavy use of MC 23 - RelValFS: FastSim RelVal. 25 - MC : Run for MC samples. 27 - DQMOffline : Run for real data (could also be run for MC). 29 For the mappings of these harvesting types to sequence names please 30 see the setup_harvesting_info() and option_handler_list_types() 37 __version__ =
"3.8.2p1" 38 __author__ =
"Jeroen Hegeman (jeroen.hegeman@cern.ch)," \
39 "Niklas Pietsch (niklas.pietsch@desy.de)" 41 twiki_url =
"https://twiki.cern.ch/twiki/bin/view/CMS/CmsHarvester" 97 from inspect
import getargspec
98 from random
import choice
102 from DBSAPI.dbsApi
import DbsApi
105 from functools
import reduce
108 global SAXParseException
110 from xml.sax
import SAXParseException
112 import Configuration.PyReleaseValidation
113 from Configuration.PyReleaseValidation.ConfigBuilder
import \
114 ConfigBuilder, defaultOptions
133 return repr(self.
msg)
144 return repr(self.
msg)
151 """Helper class to add some customised help output to cmsHarvester. 153 We want to add some instructions, as well as a pointer to the CMS 163 usage_lines.append(sep_line)
164 usage_lines.append(
"Welcome to the CMS harvester, a (hopefully useful)")
165 usage_lines.append(
"tool to create harvesting configurations.")
166 usage_lines.append(
"For more information please have a look at the CMS Twiki:")
167 usage_lines.append(
" %s" % twiki_url)
168 usage_lines.append(sep_line)
169 usage_lines.append(
"")
173 usage_lines.append(optparse.IndentedHelpFormatter. \
176 formatted_usage =
"\n".
join(usage_lines)
177 return formatted_usage
186 """XML handler class to parse DBS results. 188 The tricky thing here is that older DBS versions (2.0.5 and 189 earlier) return results in a different XML format than newer 190 versions. Previously the result values were returned as attributes 191 to the `result' element. The new approach returns result values as 192 contents of named elements. 194 The old approach is handled directly in startElement(), the new 195 approach in characters(). 197 NOTE: All results are returned in the form of string values of 208 "dataset.tag" :
"PROCESSEDDATASET_GLOBALTAG",
209 "datatype.type" :
"PRIMARYDSTYPE_TYPE",
210 "run" :
"RUNS_RUNNUMBER",
211 "run.number" :
"RUNS_RUNNUMBER",
212 "file.name" :
"FILES_LOGICALFILENAME",
213 "file.numevents" :
"FILES_NUMBEROFEVENTS",
214 "algo.version" :
"APPVERSION_VERSION",
215 "site" :
"STORAGEELEMENT_SENAME",
226 self.element_position.append(name)
235 key = DBSXMLHandler.mapping[name]
236 value =
str(attrs[key])
246 "closing unopenend element `%s'" % name
255 self.element_position.pop()
263 self.current_value.append(content)
269 """Make sure that all results arrays have equal length. 271 We should have received complete rows from DBS. I.e. all 272 results arrays in the handler should be of equal length. 278 res_names = self.results.keys()
279 if len(res_names) > 1:
280 for res_name
in res_names[1:]:
281 res_tmp = self.
results[res_name]
282 if len(res_tmp) != len(self.
results[res_names[0]]):
283 results_valid =
False 294 """Class to perform CMS harvesting. 296 More documentation `obviously' to follow. 303 "Initialize class and process command line options." 330 "single-step-allow-partial",
360 for key
in self.frontier_connection_name.keys():
409 self.Jsonlumi =
False 420 self.castor_base_dir =
None 421 self.castor_base_dir_default =
"/castor/cern.ch/" \
423 "dqm/offline/harvesting_output/" 428 self.book_keeping_file_name_default =
"harvesting_accounting.txt" 435 self.ref_hist_mappings_file_name_default =
"harvesting_ref_hist_mappings.txt" 440 self.castor_prefix =
"/castor/cern.ch" 446 self.non_t1access =
False 447 self.caf_access =
False 448 self.saveByLumiSection =
False 449 self.crab_submission =
False 450 self.nr_max_sites = 1
452 self.preferred_site =
"no preference" 455 self.datasets_to_use = {}
457 self.datasets_to_ignore = {}
459 self.book_keeping_information = {}
462 self.ref_hist_mappings = {}
466 self.runs_to_use = {}
467 self.runs_to_ignore = {}
470 self.sites_and_versions_cache = {}
473 self.globaltag_check_cache = []
477 self.all_sites_found =
True 480 self.no_matching_site_found_str =
"no_matching_site_found" 483 if cmd_line_opts
is None:
484 cmd_line_opts = sys.argv[1:]
485 self.cmd_line_opts = cmd_line_opts
488 log_handler = logging.StreamHandler()
491 log_formatter = logging.Formatter(
"%(message)s")
492 log_handler.setFormatter(log_formatter)
493 logger = logging.getLogger()
495 logger.addHandler(log_handler)
507 "Clean up after ourselves." 519 "Create a timestamp to use in the created config files." 521 time_now = datetime.datetime.utcnow()
523 time_now = time_now.replace(microsecond = 0)
524 time_stamp =
"%sUTC" % datetime.datetime.isoformat(time_now)
532 "Spit out an identification string for cmsHarvester.py." 534 ident_str =
"`cmsHarvester.py " \
535 "version %s': cmsHarvester.py %s" % \
537 reduce(
lambda x, y: x+
' '+y, sys.argv[1:]))
544 """Create the conditions string needed for `cmsDriver'. 546 Just glueing the FrontierConditions bit in front of it really. 557 if globaltag.lower().
find(
"conditions") > -1:
558 conditions_string = globaltag
560 conditions_string =
"FrontierConditions_GlobalTag,%s" % \
564 return conditions_string
569 """Return the database account name used to store the GlobalTag. 571 The name of the database account depends (albeit weakly) on 572 the CMSSW release version. 578 account_name =
"CMS_COND_31X_GLOBALTAG" 586 """See db_account_name_cms_cond_globaltag.""" 589 version = self.cmssw_version[6:11]
590 if version <
"3_4_0":
591 account_name =
"CMS_COND_31X_DQM_SUMMARY" 593 account_name =
"CMS_COND_34X" 601 "Create a nice header to be used to mark the generated files." 607 tmp.append(
"# %s" % time_stamp)
608 tmp.append(
"# WARNING: This file was created automatically!")
610 tmp.append(
"# Created by %s" % ident_str)
612 header =
"\n".
join(tmp)
620 """Adjust the level of output generated. 623 - normal : default level of output 624 - quiet : less output than the default 625 - verbose : some additional information 626 - debug : lots more information, may be overwhelming 628 NOTE: The debug option is a bit special in the sense that it 629 also modifies the output format. 636 "NORMAL" : logging.INFO,
637 "QUIET" : logging.WARNING,
638 "VERBOSE" : logging.INFO,
639 "DEBUG" : logging.DEBUG
642 output_level = output_level.upper()
650 self.logger.fatal(
"Unknown output level `%s'" % ouput_level)
659 """Switch to debug mode. 661 This both increases the amount of output generated, as well as 662 changes the format used (more detailed information is given). 667 log_formatter_debug = logging.Formatter(
"[%(levelname)s] " \
677 log_handler = self.logger.handlers[0]
678 log_handler.setFormatter(log_formatter_debug)
686 "Switch to quiet mode: less verbose." 695 """Switch on `force mode' in which case we don't brake for nobody. 697 In so-called `force mode' all sanity checks are performed but 698 we don't halt on failure. Of course this requires some care 703 self.logger.debug(
"Switching on `force mode'.")
711 """Set the harvesting type to be used. 713 This checks that no harvesting type is already set, and sets 714 the harvesting type to be used to the one specified. If a 715 harvesting type is already set an exception is thrown. The 716 same happens when an unknown type is specified. 725 value = value.lower()
728 type_index = harvesting_types_lowered.index(value)
732 self.logger.fatal(
"Unknown harvesting type `%s'" % \
734 self.logger.fatal(
" possible types are: %s" %
736 raise Usage(
"Unknown harvesting type `%s'" % \
742 msg =
"Only one harvesting type should be specified" 743 self.logger.fatal(msg)
747 self.logger.info(
"Harvesting type to be used: `%s'" % \
755 """Set the harvesting mode to be used. 757 Single-step harvesting can be used for samples that are 758 located completely at a single site (= SE). Otherwise use 764 harvesting_mode = value.lower()
766 msg =
"Unknown harvesting mode `%s'" % harvesting_mode
767 self.logger.fatal(msg)
768 self.logger.fatal(
" possible modes are: %s" % \
775 msg =
"Only one harvesting mode should be specified" 776 self.logger.fatal(msg)
780 self.logger.info(
"Harvesting mode to be used: `%s'" % \
788 """Set the GlobalTag to be used, overriding our own choices. 790 By default the cmsHarvester will use the GlobalTag with which 791 a given dataset was created also for the harvesting. The 792 --globaltag option is the way to override this behaviour. 798 msg =
"Only one GlobalTag should be specified" 799 self.logger.fatal(msg)
803 self.logger.info(
"GlobalTag to be used: `%s'" % \
811 "Switch use of all reference histograms off." 815 self.logger.warning(
"Switching off all use of reference histograms")
823 """Override the default Frontier connection string. 825 Please only use this for testing (e.g. when a test payload has 826 been inserted into cms_orc_off instead of cms_orc_on). 828 This method gets called for three different command line 830 - --frontier-connection, 831 - --frontier-connection-for-globaltag, 832 - --frontier-connection-for-refhists. 833 Appropriate care has to be taken to make sure things are only 839 frontier_type = opt_str.split(
"-")[-1]
840 if frontier_type ==
"connection":
842 frontier_types = self.frontier_connection_name.keys()
844 frontier_types = [frontier_type]
848 for connection_name
in frontier_types:
850 msg =
"Please specify either:\n" \
851 " `--frontier-connection' to change the " \
852 "Frontier connection used for everything, or\n" \
853 "either one or both of\n" \
854 " `--frontier-connection-for-globaltag' to " \
855 "change the Frontier connection used for the " \
857 " `--frontier-connection-for-refhists' to change " \
858 "the Frontier connection used for the " \
859 "reference histograms." 860 self.logger.fatal(msg)
863 frontier_prefix =
"frontier://" 864 if not value.startswith(frontier_prefix):
865 msg =
"Expecting Frontier connections to start with " \
866 "`%s'. You specified `%s'." % \
867 (frontier_prefix, value)
868 self.logger.fatal(msg)
872 if value.find(
"FrontierProd") < 0
and \
873 value.find(
"FrontierProd") < 0:
874 msg =
"Expecting Frontier connections to contain either " \
875 "`FrontierProd' or `FrontierPrep'. You specified " \
876 "`%s'. Are you sure?" % \
878 self.logger.warning(msg)
880 if not value.endswith(
"/"):
883 for connection_name
in frontier_types:
887 frontier_type_str =
"unknown" 888 if connection_name ==
"globaltag":
889 frontier_type_str =
"the GlobalTag" 890 elif connection_name ==
"refhists":
891 frontier_type_str =
"the reference histograms" 893 self.logger.warning(
"Overriding default Frontier " \
894 "connection for %s " \
932 if opt_str.lower().
find(
"ignore") > -1:
938 if opt_str.lower().
find(
"dataset") > -1:
939 select_type =
"datasets" 943 if not self.
input_method[select_type][spec_type]
is None:
944 msg =
"Please only specify one input method " \
945 "(for the `%s' case)" % opt_str
946 self.logger.fatal(msg)
949 input_method = opt_str.replace(
"-",
"").
replace(
"ignore",
"")
950 self.
input_method[select_type][spec_type] = input_method
951 self.
input_name[select_type][spec_type] = value
953 self.logger.debug(
"Input method for the `%s' case: %s" % \
954 (spec_type, input_method))
961 """Store the name of the file to be used for book keeping. 963 The only check done here is that only a single book keeping 971 msg =
"Only one book keeping file should be specified" 972 self.logger.fatal(msg)
976 self.logger.info(
"Book keeping file to be used: `%s'" % \
984 """Store the name of the file for the ref. histogram mapping. 991 msg =
"Only one reference histogram mapping file " \
992 "should be specified" 993 self.logger.fatal(msg)
997 self.logger.info(
"Reference histogram mapping file " \
998 "to be used: `%s'" % \
1060 """Specify where on CASTOR the output should go. 1062 At the moment only output to CERN CASTOR is 1063 supported. Eventually the harvested results should go into the 1064 central place for DQM on CASTOR anyway. 1071 castor_prefix = self.castor_prefix
1074 castor_dir = os.path.join(os.path.sep, castor_dir)
1075 self.castor_base_dir = os.path.normpath(castor_dir)
1077 self.logger.info(
"CASTOR (base) area to be used: `%s'" % \
1078 self.castor_base_dir)
1085 """Set the self.no_t1access flag to try and create jobs that 1086 run without special `t1access' role. 1090 self.non_t1access =
True 1092 self.logger.warning(
"Running in `non-t1access' mode. " \
1093 "Will try to create jobs that run " \
1094 "without special rights but no " \
1095 "further promises...")
1102 """Set the self.caf_access flag to try and create jobs that 1106 self.caf_access =
True 1108 self.logger.warning(
"Running in `caf_access' mode. " \
1109 "Will try to create jobs that run " \
1111 "further promises...")
1118 """Set process.dqmSaver.saveByLumiSectiont=1 in cfg harvesting file 1120 self.saveByLumiSection =
True 1122 self.logger.warning(
"waning concerning saveByLumiSection option")
1130 """Crab jobs are not created and 1131 "submitted automatically", 1133 self.crab_submission =
True 1141 self.nr_max_sites = value
1147 self.preferred_site = value
1152 """List all harvesting types and their mappings. 1154 This lists all implemented harvesting types with their 1155 corresponding mappings to sequence names. This had to be 1156 separated out from the help since it depends on the CMSSW 1157 version and was making things a bit of a mess. 1159 NOTE: There is no way (at least not that I could come up with) 1160 to code this in a neat generic way that can be read both by 1161 this method and by setup_harvesting_info(). Please try hard to 1162 keep these two methods in sync! 1167 sep_line_short =
"-" * 20
1170 print "The following harvesting types are available:" 1173 print "`RelVal' maps to:" 1174 print " pre-3_3_0 : HARVESTING:validationHarvesting" 1175 print " 3_4_0_pre2 and later: HARVESTING:validationHarvesting+dqmHarvesting" 1176 print " Exceptions:" 1177 print " 3_3_0_pre1-4 : HARVESTING:validationHarvesting" 1178 print " 3_3_0_pre6 : HARVESTING:validationHarvesting" 1179 print " 3_4_0_pre1 : HARVESTING:validationHarvesting" 1181 print sep_line_short
1183 print "`RelValFS' maps to:" 1184 print " always : HARVESTING:validationHarvestingFS" 1186 print sep_line_short
1188 print "`MC' maps to:" 1189 print " always : HARVESTING:validationprodHarvesting" 1191 print sep_line_short
1193 print "`DQMOffline' maps to:" 1194 print " always : HARVESTING:dqmHarvesting" 1207 """Fill our dictionary with all info needed to understand 1210 This depends on the CMSSW version since at some point the 1211 names and sequences were modified. 1213 NOTE: There is no way (at least not that I could come up with) 1214 to code this in a neat generic way that can be read both by 1215 this method and by option_handler_list_types(). Please try 1216 hard to keep these two methods in sync! 1220 assert not self.cmssw_version
is None, \
1221 "ERROR setup_harvesting() requires " \
1222 "self.cmssw_version to be set!!!" 1224 harvesting_info = {}
1227 harvesting_info[
"DQMOffline"] = {}
1228 harvesting_info[
"DQMOffline"][
"beamspot"] =
None 1229 harvesting_info[
"DQMOffline"][
"eventcontent"] =
None 1230 harvesting_info[
"DQMOffline"][
"harvesting"] =
"AtRunEnd" 1232 harvesting_info[
"RelVal"] = {}
1233 harvesting_info[
"RelVal"][
"beamspot"] =
None 1234 harvesting_info[
"RelVal"][
"eventcontent"] =
None 1235 harvesting_info[
"RelVal"][
"harvesting"] =
"AtRunEnd" 1237 harvesting_info[
"RelValFS"] = {}
1238 harvesting_info[
"RelValFS"][
"beamspot"] =
None 1239 harvesting_info[
"RelValFS"][
"eventcontent"] =
None 1240 harvesting_info[
"RelValFS"][
"harvesting"] =
"AtRunEnd" 1242 harvesting_info[
"MC"] = {}
1243 harvesting_info[
"MC"][
"beamspot"] =
None 1244 harvesting_info[
"MC"][
"eventcontent"] =
None 1245 harvesting_info[
"MC"][
"harvesting"] =
"AtRunEnd" 1255 assert self.cmssw_version.startswith(
"CMSSW_")
1258 version = self.cmssw_version[6:]
1264 if version <
"3_3_0":
1265 step_string =
"validationHarvesting" 1266 elif version
in [
"3_3_0_pre1",
"3_3_0_pre2",
1267 "3_3_0_pre3",
"3_3_0_pre4",
1268 "3_3_0_pre6",
"3_4_0_pre1"]:
1269 step_string =
"validationHarvesting" 1271 step_string =
"validationHarvesting+dqmHarvesting" 1273 harvesting_info[
"RelVal"][
"step_string"] = step_string
1277 assert not step_string
is None, \
1278 "ERROR Could not decide a RelVal harvesting sequence " \
1279 "for CMSSW version %s" % self.cmssw_version
1285 step_string =
"validationHarvestingFS" 1287 harvesting_info[
"RelValFS"][
"step_string"] = step_string
1292 step_string =
"validationprodHarvesting" 1294 harvesting_info[
"MC"][
"step_string"] = step_string
1298 assert not step_string
is None, \
1299 "ERROR Could not decide a MC harvesting " \
1300 "sequence for CMSSW version %s" % self.cmssw_version
1306 step_string =
"dqmHarvesting" 1308 harvesting_info[
"DQMOffline"][
"step_string"] = step_string
1312 self.harvesting_info = harvesting_info
1314 self.logger.info(
"Based on the CMSSW version (%s) " \
1315 "I decided to use the `HARVESTING:%s' " \
1316 "sequence for %s harvesting" % \
1317 (self.cmssw_version,
1318 self.harvesting_info[self.harvesting_type][
"step_string"],
1319 self.harvesting_type))
1326 """Build the common part of the output path to be used on 1329 This consists of the CASTOR area base path specified by the 1330 user and a piece depending on the data type (data vs. MC), the 1331 harvesting type and the dataset name followed by a piece 1332 containing the run number and event count. (See comments in 1333 create_castor_path_name_special for details.) This method 1334 creates the common part, without run number and event count. 1338 castor_path = self.castor_base_dir
1343 datatype = self.datasets_information[dataset_name][
"datatype"]
1344 datatype = datatype.lower()
1345 castor_path = os.path.join(castor_path, datatype)
1348 harvesting_type = self.harvesting_type
1349 harvesting_type = harvesting_type.lower()
1350 castor_path = os.path.join(castor_path, harvesting_type)
1360 release_version = self.cmssw_version
1361 release_version = release_version.lower(). \
1364 castor_path = os.path.join(castor_path, release_version)
1367 dataset_name_escaped = self.escape_dataset_name(dataset_name)
1368 castor_path = os.path.join(castor_path, dataset_name_escaped)
1372 castor_path = os.path.normpath(castor_path)
1380 dataset_name, run_number,
1381 castor_path_common):
1382 """Create the specialised part of the CASTOR output dir name. 1384 NOTE: To avoid clashes with `incremental harvesting' 1385 (re-harvesting when a dataset grows) we have to include the 1386 event count in the path name. The underlying `problem' is that 1387 CRAB does not overwrite existing output files so if the output 1388 file already exists CRAB will fail to copy back the output. 1390 NOTE: It's not possible to create different kinds of 1391 harvesting jobs in a single call to this tool. However, in 1392 principle it could be possible to create both data and MC jobs 1395 NOTE: The number of events used in the path name is the 1396 _total_ number of events in the dataset/run at the time of 1397 harvesting. If we're doing partial harvesting the final 1398 results will reflect lower statistics. This is a) the easiest 1399 to code and b) the least likely to lead to confusion if 1400 someone ever decides to swap/copy around file blocks between 1405 castor_path = castor_path_common
1410 castor_path = os.path.join(castor_path,
"run_%d" % run_number)
1418 castor_path = os.path.join(castor_path,
"nevents")
1422 castor_path = os.path.normpath(castor_path)
1430 """Make sure all required CASTOR output dirs exist. 1432 This checks the CASTOR base dir specified by the user as well 1433 as all the subdirs required by the current set of jobs. 1437 self.logger.info(
"Checking (and if necessary creating) CASTOR " \
1438 "output area(s)...")
1441 self.create_and_check_castor_dir(self.castor_base_dir)
1445 for (dataset_name, runs)
in self.datasets_to_use.iteritems():
1448 castor_dirs.append(self.datasets_information[dataset_name] \
1449 [
"castor_path"][run])
1450 castor_dirs_unique =
list(set(castor_dirs))
1451 castor_dirs_unique.sort()
1455 ndirs = len(castor_dirs_unique)
1456 step =
max(ndirs / 10, 1)
1457 for (i, castor_dir)
in enumerate(castor_dirs_unique):
1458 if (i + 1) % step == 0
or \
1460 self.logger.info(
" %d/%d" % \
1462 self.create_and_check_castor_dir(castor_dir)
1469 self.logger.debug(
"Checking if path `%s' is empty" % \
1471 cmd =
"rfdir %s" % castor_dir
1472 (status, output) = commands.getstatusoutput(cmd)
1474 msg =
"Could not access directory `%s'" \
1475 " !!! This is bad since I should have just" \
1476 " created it !!!" % castor_dir
1477 self.logger.fatal(msg)
1480 self.logger.warning(
"Output directory `%s' is not empty:" \
1481 " new jobs will fail to" \
1482 " copy back output" % \
1490 """Check existence of the give CASTOR dir, if necessary create 1493 Some special care has to be taken with several things like 1494 setting the correct permissions such that CRAB can store the 1495 output results. Of course this means that things like 1496 /castor/cern.ch/ and user/j/ have to be recognised and treated 1499 NOTE: Only CERN CASTOR area (/castor/cern.ch/) supported for 1502 NOTE: This method uses some slightly tricky caching to make 1503 sure we don't keep over and over checking the same base paths. 1510 def split_completely(path):
1511 (parent_path, name) = os.path.split(path)
1513 return (parent_path, )
1515 return split_completely(parent_path) + (name, )
1521 def extract_permissions(rfstat_output):
1522 """Parse the output from rfstat and return the 1523 5-digit permissions string.""" 1525 permissions_line = [i
for i
in output.split(
"\n") \
1526 if i.lower().
find(
"protection") > -1]
1527 regexp = re.compile(
".*\(([0123456789]{5})\).*")
1528 match = regexp.search(rfstat_output)
1529 if not match
or len(match.groups()) != 1:
1530 msg =
"Could not extract permissions " \
1531 "from output: %s" % rfstat_output
1532 self.logger.fatal(msg)
1534 permissions = match.group(1)
1551 castor_paths_dont_touch = {
1552 0: [
"/",
"castor",
"cern.ch",
"cms",
"store",
"temp",
1553 "dqm",
"offline",
"user"],
1554 -1: [
"user",
"store"]
1557 self.logger.debug(
"Checking CASTOR path `%s'" % castor_dir)
1562 castor_path_pieces = split_completely(castor_dir)
1568 check_sizes = castor_paths_dont_touch.keys()
1570 len_castor_path_pieces = len(castor_path_pieces)
1571 for piece_index
in xrange (len_castor_path_pieces):
1572 skip_this_path_piece =
False 1573 piece = castor_path_pieces[piece_index]
1576 for check_size
in check_sizes:
1578 if (piece_index + check_size) > -1:
1582 if castor_path_pieces[piece_index + check_size]
in castor_paths_dont_touch[check_size]:
1584 skip_this_path_piece =
True 1592 path = os.path.join(path, piece)
1599 if path
in self.castor_path_checks_cache:
1601 except AttributeError:
1603 self.castor_path_checks_cache = []
1604 self.castor_path_checks_cache.append(path)
1622 if not skip_this_path_piece:
1630 self.logger.debug(
"Checking if path `%s' exists" % \
1632 cmd =
"rfstat %s" % path
1633 (status, output) = commands.getstatusoutput(cmd)
1636 self.logger.debug(
"Creating path `%s'" % path)
1637 cmd =
"nsmkdir -m 775 %s" % path
1638 (status, output) = commands.getstatusoutput(cmd)
1640 msg =
"Could not create directory `%s'" % path
1641 self.logger.fatal(msg)
1643 cmd =
"rfstat %s" % path
1644 (status, output) = commands.getstatusoutput(cmd)
1649 permissions = extract_permissions(output)
1650 if not permissions.startswith(
"40"):
1651 msg =
"Path `%s' is not a directory(?)" % path
1652 self.logger.fatal(msg)
1657 self.logger.debug(
"Checking permissions for path `%s'" % path)
1658 cmd =
"rfstat %s" % path
1659 (status, output) = commands.getstatusoutput(cmd)
1661 msg =
"Could not obtain permissions for directory `%s'" % \
1663 self.logger.fatal(msg)
1666 permissions = extract_permissions(output)[-3:]
1670 if piece_index == (len_castor_path_pieces - 1):
1673 permissions_target =
"775" 1676 permissions_target =
"775" 1679 permissions_new = []
1680 for (i, j)
in zip(permissions, permissions_target):
1682 permissions_new =
"".
join(permissions_new)
1683 self.logger.debug(
" current permissions: %s" % \
1685 self.logger.debug(
" target permissions : %s" % \
1687 if permissions_new != permissions:
1689 self.logger.debug(
"Changing permissions of `%s' " \
1690 "to %s (were %s)" % \
1691 (path, permissions_new, permissions))
1692 cmd =
"rfchmod %s %s" % (permissions_new, path)
1693 (status, output) = commands.getstatusoutput(cmd)
1695 msg =
"Could not change permissions for path `%s' " \
1696 "to %s" % (path, permissions_new)
1697 self.logger.fatal(msg)
1700 self.logger.debug(
" Permissions ok (%s)" % permissions_new)
1709 sites_forbidden = []
1711 if (self.preferred_site ==
"CAF")
or (self.preferred_site ==
"caf.cern.ch"):
1712 self.caf_access =
True 1714 if self.caf_access ==
False:
1715 sites_forbidden.append(
"caf.cern.ch")
1726 "cmssrm-fzk.gridka.de",
1728 "gridka-dCache.fzk.de",
1729 "srm-cms.gridpp.rl.ac.uk",
1730 "srm.grid.sinica.edu.tw",
1731 "srm2.grid.sinica.edu.tw",
1733 "storm-fe-cms.cr.cnaf.infn.it" 1737 "CAF" :
"caf.cern.ch",
1738 "CH" :
"srm-cms.cern.ch",
1739 "FR" :
"ccsrm.in2p3.fr",
1740 "DE" :
"cmssrm-fzk.gridka.de",
1741 "GOV" :
"cmssrm.fnal.gov",
1742 "DE2" :
"gridka-dCache.fzk.de",
1743 "UK" :
"srm-cms.gridpp.rl.ac.uk",
1744 "TW" :
"srm.grid.sinica.edu.tw",
1745 "TW2" :
"srm2.grid.sinica.edu.tw",
1746 "ES" :
"srmcms.pic.es",
1747 "IT" :
"storm-fe-cms.cr.cnaf.infn.it" 1750 if self.non_t1access:
1751 sites_forbidden.extend(all_t1)
1753 for site
in sites_forbidden:
1757 if self.preferred_site
in country_codes:
1758 self.preferred_site = country_codes[self.preferred_site]
1760 if self.preferred_site !=
"no preference":
1761 if self.preferred_site
in sites:
1762 sites = [self.preferred_site]
1775 while len(sites) > 0
and \
1782 t1_sites.append(site)
1783 if site ==
"caf.cern.ch":
1784 t1_sites.append(site)
1791 if len(t1_sites) > 0:
1792 se_name = choice(t1_sites)
1795 se_name = choice(sites)
1799 if se_name
in self.sites_and_versions_cache
and \
1800 cmssw_version
in self.sites_and_versions_cache[se_name]:
1801 if self.sites_and_versions_cache[se_name][cmssw_version]:
1805 self.logger.debug(
" --> rejecting site `%s'" % se_name)
1806 sites.remove(se_name)
1809 self.logger.info(
"Checking if site `%s' " \
1810 "has CMSSW version `%s'" % \
1811 (se_name, cmssw_version))
1812 self.sites_and_versions_cache[se_name] = {}
1827 cmd =
"lcg-info --list-ce " \
1830 "CEStatus=Production," \
1832 (cmssw_version, se_name)
1833 (status, output) = commands.getstatusoutput(cmd)
1835 self.logger.error(
"Could not check site information " \
1836 "for site `%s'" % se_name)
1838 if (len(output) > 0)
or (se_name ==
"caf.cern.ch"):
1839 self.sites_and_versions_cache[se_name][cmssw_version] =
True 1843 self.sites_and_versions_cache[se_name][cmssw_version] =
False 1844 self.logger.debug(
" --> rejecting site `%s'" % se_name)
1845 sites.remove(se_name)
1847 if site_name
is self.no_matching_site_found_str:
1848 self.logger.error(
" --> no matching site found")
1849 self.logger.error(
" --> Your release or SCRAM " \
1850 "architecture may not be available" \
1851 "anywhere on the (LCG) grid.")
1853 self.logger.debug(
" (command used: `%s')" % cmd)
1855 self.logger.debug(
" --> selected site `%s'" % site_name)
1859 if site_name
is None:
1860 site_name = self.no_matching_site_found_str
1863 self.all_sites_found =
False 1875 parser = optparse.OptionParser(version=
"%s %s" % \
1876 (
"%prog", self.version),
1879 self.option_parser = parser
1882 parser.add_option(
"-d",
"--debug",
1883 help=
"Switch on debug mode",
1885 callback=self.option_handler_debug)
1888 parser.add_option(
"-q",
"--quiet",
1889 help=
"Be less verbose",
1891 callback=self.option_handler_quiet)
1895 parser.add_option(
"",
"--force",
1896 help=
"Force mode. Do not abort on sanity check " 1899 callback=self.option_handler_force)
1902 parser.add_option(
"",
"--harvesting_type",
1903 help=
"Harvesting type: %s" % \
1904 ", ".
join(self.harvesting_types),
1906 callback=self.option_handler_harvesting_type,
1908 metavar=
"HARVESTING_TYPE")
1911 parser.add_option(
"",
"--harvesting_mode",
1912 help=
"Harvesting mode: %s (default = %s)" % \
1913 (
", ".
join(self.harvesting_modes),
1914 self.harvesting_mode_default),
1916 callback=self.option_handler_harvesting_mode,
1918 metavar=
"HARVESTING_MODE")
1921 parser.add_option(
"",
"--globaltag",
1922 help=
"GlobalTag to use. Default is the ones " \
1923 "the dataset was created with for MC, for data" \
1924 "a GlobalTag has to be specified.",
1926 callback=self.option_handler_globaltag,
1928 metavar=
"GLOBALTAG")
1931 parser.add_option(
"",
"--no-ref-hists",
1932 help=
"Don't use any reference histograms",
1934 callback=self.option_handler_no_ref_hists)
1938 parser.add_option(
"",
"--frontier-connection",
1939 help=
"Use this Frontier connection to find " \
1940 "GlobalTags and LocalTags (for reference " \
1941 "histograms).\nPlease only use this for " \
1944 callback=self.option_handler_frontier_connection,
1950 parser.add_option(
"",
"--frontier-connection-for-globaltag",
1951 help=
"Use this Frontier connection to find " \
1952 "GlobalTags.\nPlease only use this for " \
1955 callback=self.option_handler_frontier_connection,
1961 parser.add_option(
"",
"--frontier-connection-for-refhists",
1962 help=
"Use this Frontier connection to find " \
1963 "LocalTags (for reference " \
1964 "histograms).\nPlease only use this for " \
1967 callback=self.option_handler_frontier_connection,
1973 parser.add_option(
"",
"--dataset",
1974 help=
"Name (or regexp) of dataset(s) to process",
1977 callback=self.option_handler_input_spec,
1984 parser.add_option(
"",
"--dataset-ignore",
1985 help=
"Name (or regexp) of dataset(s) to ignore",
1987 callback=self.option_handler_input_spec,
1989 metavar=
"DATASET-IGNORE")
1993 parser.add_option(
"",
"--runs",
1994 help=
"Run number(s) to process",
1996 callback=self.option_handler_input_spec,
2002 parser.add_option(
"",
"--runs-ignore",
2003 help=
"Run number(s) to ignore",
2005 callback=self.option_handler_input_spec,
2007 metavar=
"RUNS-IGNORE")
2011 parser.add_option(
"",
"--datasetfile",
2012 help=
"File containing list of dataset names " \
2013 "(or regexps) to process",
2016 callback=self.option_handler_input_spec,
2019 metavar=
"DATASETFILE")
2023 parser.add_option(
"",
"--datasetfile-ignore",
2024 help=
"File containing list of dataset names " \
2025 "(or regexps) to ignore",
2027 callback=self.option_handler_input_spec,
2029 metavar=
"DATASETFILE-IGNORE")
2033 parser.add_option(
"",
"--runslistfile",
2034 help=
"File containing list of run numbers " \
2037 callback=self.option_handler_input_spec,
2039 metavar=
"RUNSLISTFILE")
2043 parser.add_option(
"",
"--runslistfile-ignore",
2044 help=
"File containing list of run numbers " \
2047 callback=self.option_handler_input_spec,
2049 metavar=
"RUNSLISTFILE-IGNORE")
2053 parser.add_option(
"",
"--Jsonrunfile",
2054 help=
"Jsonfile containing dictionary of run/lumisections pairs. " \
2055 "All lumisections of runs contained in dictionary are processed.",
2057 callback=self.option_handler_input_Jsonrunfile,
2059 metavar=
"JSONRUNFILE")
2063 parser.add_option(
"",
"--Jsonfile",
2064 help=
"Jsonfile containing dictionary of run/lumisections pairs. " \
2065 "Only specified lumisections of runs contained in dictionary are processed.",
2067 callback=self.option_handler_input_Jsonfile,
2073 parser.add_option(
"",
"--todo-file",
2074 help=
"Todo file containing a list of runs to process.",
2076 callback=self.option_handler_input_todofile,
2078 metavar=
"TODO-FILE")
2082 parser.add_option(
"",
"--refhistmappingfile",
2083 help=
"File to be use for the reference " \
2084 "histogram mappings. Default: `%s'." % \
2085 self.ref_hist_mappings_file_name_default,
2087 callback=self.option_handler_ref_hist_mapping_file,
2089 metavar=
"REFHISTMAPPING-FILE")
2094 parser.add_option(
"",
"--castordir",
2095 help=
"Place on CASTOR to store results. " \
2096 "Default: `%s'." % \
2097 self.castor_base_dir_default,
2099 callback=self.option_handler_castor_dir,
2101 metavar=
"CASTORDIR")
2105 parser.add_option(
"",
"--no-t1access",
2106 help=
"Try to create jobs that will run " \
2107 "without special `t1access' role",
2109 callback=self.option_handler_no_t1access)
2112 parser.add_option(
"",
"--caf-access",
2113 help=
"Crab jobs may run " \
2116 callback=self.option_handler_caf_access)
2119 parser.add_option(
"",
"--saveByLumiSection",
2120 help=
"set saveByLumiSection=1 in harvesting cfg file",
2122 callback=self.option_handler_saveByLumiSection)
2125 parser.add_option(
"",
"--automatic-crab-submission",
2126 help=
"Crab jobs are created and " \
2127 "submitted automatically",
2129 callback=self.option_handler_crab_submission)
2133 parser.add_option(
"",
"--max-sites",
2134 help=
"Max. number of sites each job is submitted to",
2136 callback=self.option_handler_sites,
2140 parser.add_option(
"",
"--site",
2141 help=
"Crab jobs are submitted to specified site. T1 sites may be shortened by the following (country) codes: \ 2142 srm-cms.cern.ch : CH \ 2143 ccsrm.in2p3.fr : FR \ 2144 cmssrm-fzk.gridka.de : DE \ 2145 cmssrm.fnal.gov : GOV \ 2146 gridka-dCache.fzk.de : DE2 \ 2147 rm-cms.gridpp.rl.ac.uk : UK \ 2148 srm.grid.sinica.edu.tw : TW \ 2149 srm2.grid.sinica.edu.tw : TW2 \ 2150 srmcms.pic.es : ES \ 2151 storm-fe-cms.cr.cnaf.infn.it : IT",
2153 callback=self.option_handler_preferred_site,
2158 parser.add_option(
"-l",
"--list",
2159 help=
"List all harvesting types and their" \
2160 "corresponding sequence names",
2162 callback=self.option_handler_list_types)
2168 if len(self.cmd_line_opts) < 1:
2169 self.cmd_line_opts = [
"--help"]
2178 for i
in [
"-d",
"--debug",
2180 if i
in self.cmd_line_opts:
2181 self.cmd_line_opts.remove(i)
2182 self.cmd_line_opts.insert(0, i)
2185 parser.set_defaults()
2186 (self.options, self.args) = parser.parse_args(self.cmd_line_opts)
2193 """Check completeness and correctness of input information. 2195 Check that all required information has been specified and 2196 that, at least as far as can be easily checked, it makes 2199 NOTE: This is also where any default values are applied. 2203 self.logger.info(
"Checking completeness/correctness of input...")
2207 if len(self.args) > 0:
2208 msg =
"Sorry but I don't understand `%s'" % \
2209 (
" ".
join(self.args))
2210 self.logger.fatal(msg)
2216 if self.harvesting_mode ==
"two-step":
2217 msg =
"--------------------\n" \
2218 " Sorry, but for the moment (well, till it works)" \
2219 " the two-step mode has been disabled.\n" \
2220 "--------------------\n" 2221 self.logger.fatal(msg)
2226 if self.harvesting_type
is None:
2227 msg =
"Please specify a harvesting type" 2228 self.logger.fatal(msg)
2231 if self.harvesting_mode
is None:
2232 self.harvesting_mode = self.harvesting_mode_default
2233 msg =
"No harvesting mode specified --> using default `%s'" % \
2234 self.harvesting_mode
2235 self.logger.warning(msg)
2241 if self.input_method[
"datasets"][
"use"]
is None:
2242 msg =
"Please specify an input dataset name " \
2243 "or a list file name" 2244 self.logger.fatal(msg)
2249 assert not self.input_name[
"datasets"][
"use"]
is None 2256 if self.use_ref_hists:
2257 if self.ref_hist_mappings_file_name
is None:
2258 self.ref_hist_mappings_file_name = self.ref_hist_mappings_file_name_default
2259 msg =
"No reference histogram mapping file specified --> " \
2260 "using default `%s'" % \
2261 self.ref_hist_mappings_file_name
2262 self.logger.warning(msg)
2268 if self.castor_base_dir
is None:
2269 self.castor_base_dir = self.castor_base_dir_default
2270 msg =
"No CASTOR area specified -> using default `%s'" % \
2271 self.castor_base_dir
2272 self.logger.warning(msg)
2276 if not self.castor_base_dir.startswith(self.castor_prefix):
2277 msg =
"CASTOR area does not start with `%s'" % \
2279 self.logger.fatal(msg)
2280 if self.castor_base_dir.find(
"castor") > -1
and \
2281 not self.castor_base_dir.find(
"cern.ch") > -1:
2282 self.logger.fatal(
"Only CERN CASTOR is supported")
2293 if self.globaltag
is None:
2294 self.logger.warning(
"No GlobalTag specified. This means I cannot")
2295 self.logger.warning(
"run on data, only on MC.")
2296 self.logger.warning(
"I will skip all data datasets.")
2301 if not self.globaltag
is None:
2302 if not self.globaltag.endswith(
"::All"):
2303 self.logger.warning(
"Specified GlobalTag `%s' does " \
2304 "not end in `::All' --> " \
2305 "appending this missing piece" % \
2307 self.globaltag =
"%s::All" % self.globaltag
2312 for (key, value)
in self.frontier_connection_name.iteritems():
2313 frontier_type_str =
"unknown" 2314 if key ==
"globaltag":
2315 frontier_type_str =
"the GlobalTag" 2316 elif key ==
"refhists":
2317 frontier_type_str =
"the reference histograms" 2319 if self.frontier_connection_overridden[key] ==
True:
2323 self.logger.info(
"Using %sdefault Frontier " \
2324 "connection for %s: `%s'" % \
2325 (non_str, frontier_type_str, value))
2334 """Check if CMSSW is setup. 2341 cmssw_version = os.getenv(
"CMSSW_VERSION")
2342 if cmssw_version
is None:
2343 self.logger.fatal(
"It seems CMSSW is not setup...")
2344 self.logger.fatal(
"($CMSSW_VERSION is empty)")
2345 raise Error(
"ERROR: CMSSW needs to be setup first!")
2347 self.cmssw_version = cmssw_version
2348 self.logger.info(
"Found CMSSW version %s properly set up" % \
2357 """Check if DBS is setup. 2364 dbs_home = os.getenv(
"DBSCMD_HOME")
2365 if dbs_home
is None:
2366 self.logger.fatal(
"It seems DBS is not setup...")
2367 self.logger.fatal(
" $DBSCMD_HOME is empty")
2368 raise Error(
"ERROR: DBS needs to be setup first!")
2386 self.logger.debug(
"Found DBS properly set up")
2394 """Setup the Python side of DBS. 2396 For more information see the DBS Python API documentation: 2397 https://twiki.cern.ch/twiki/bin/view/CMS/DBSApiDocumentation 2403 args[
"url"]=
"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/" \
2404 "servlet/DBSServlet" 2408 except DBSAPI.dbsApiException.DbsApiException
as ex:
2409 self.logger.fatal(
"Caught DBS API exception %s: %s " % \
2410 (ex.getClassName(), ex.getErrorMessage()))
2411 if ex.getErrorCode()
not in (
None,
""):
2412 logger.debug(
"DBS exception error code: ", ex.getErrorCode())
2420 """Use DBS to resolve a wildcarded dataset name. 2426 assert not self.dbs_api
is None 2431 if not (dataset_name.startswith(
"/")
and \
2432 dataset_name.endswith(
"RECO")):
2433 self.logger.warning(
"Dataset name `%s' does not sound " \
2434 "like a valid dataset name!" % \
2440 dbs_query =
"find dataset where dataset like %s " \
2441 "and dataset.status = VALID" % \
2444 api_result = api.executeQuery(dbs_query)
2445 except DBSAPI.dbsApiException.DbsApiException:
2446 msg =
"ERROR: Could not execute DBS query" 2447 self.logger.fatal(msg)
2452 parser = xml.sax.make_parser()
2453 parser.setContentHandler(handler)
2457 xml.sax.parseString(api_result, handler)
2458 except SAXParseException:
2459 msg =
"ERROR: Could not parse DBS server output" 2460 self.logger.fatal(msg)
2464 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2468 datasets = handler.results.values()[0]
2476 """Ask DBS for the CMSSW version used to create this dataset. 2482 assert not self.dbs_api
is None 2486 dbs_query =
"find algo.version where dataset = %s " \
2487 "and dataset.status = VALID" % \
2490 api_result = api.executeQuery(dbs_query)
2491 except DBSAPI.dbsApiException.DbsApiException:
2492 msg =
"ERROR: Could not execute DBS query" 2493 self.logger.fatal(msg)
2497 parser = xml.sax.make_parser()
2498 parser.setContentHandler(handler)
2501 xml.sax.parseString(api_result, handler)
2502 except SAXParseException:
2503 msg =
"ERROR: Could not parse DBS server output" 2504 self.logger.fatal(msg)
2508 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2511 cmssw_version = handler.results.values()[0]
2514 assert len(cmssw_version) == 1
2517 cmssw_version = cmssw_version[0]
2520 return cmssw_version
2570 """Ask DBS for the list of runs in a given dataset. 2572 # NOTE: This does not (yet?) skip/remove empty runs. There is 2573 # a bug in the DBS entry run.numevents (i.e. it always returns 2574 # zero) which should be fixed in the `next DBS release'. 2576 # https://savannah.cern.ch/bugs/?53452 2577 # https://savannah.cern.ch/bugs/?53711 2588 assert not self.dbs_api
is None 2592 dbs_query =
"find run where dataset = %s " \
2593 "and dataset.status = VALID" % \
2596 api_result = api.executeQuery(dbs_query)
2597 except DBSAPI.dbsApiException.DbsApiException:
2598 msg =
"ERROR: Could not execute DBS query" 2599 self.logger.fatal(msg)
2603 parser = xml.sax.make_parser()
2604 parser.setContentHandler(handler)
2607 xml.sax.parseString(api_result, handler)
2608 except SAXParseException:
2609 msg =
"ERROR: Could not parse DBS server output" 2610 self.logger.fatal(msg)
2614 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2617 runs = handler.results.values()[0]
2619 runs = [
int(i)
for i
in runs]
2628 """Ask DBS for the globaltag corresponding to a given dataset. 2631 # This does not seem to work for data datasets? E.g. for 2632 # /Cosmics/Commissioning08_CRAFT0831X_V1_311_ReReco_FromSuperPointing_v1/RAW-RECO 2633 # Probaly due to the fact that the GlobalTag changed during 2641 assert not self.dbs_api
is None 2645 dbs_query =
"find dataset.tag where dataset = %s " \
2646 "and dataset.status = VALID" % \
2649 api_result = api.executeQuery(dbs_query)
2650 except DBSAPI.dbsApiException.DbsApiException:
2651 msg =
"ERROR: Could not execute DBS query" 2652 self.logger.fatal(msg)
2656 parser = xml.sax.make_parser()
2657 parser.setContentHandler(parser)
2660 xml.sax.parseString(api_result, handler)
2661 except SAXParseException:
2662 msg =
"ERROR: Could not parse DBS server output" 2663 self.logger.fatal(msg)
2667 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2670 globaltag = handler.results.values()[0]
2673 assert len(globaltag) == 1
2676 globaltag = globaltag[0]
2684 """Ask DBS for the the data type (data or mc) of a given 2691 assert not self.dbs_api
is None 2695 dbs_query =
"find datatype.type where dataset = %s " \
2696 "and dataset.status = VALID" % \
2699 api_result = api.executeQuery(dbs_query)
2700 except DBSAPI.dbsApiException.DbsApiException:
2701 msg =
"ERROR: Could not execute DBS query" 2702 self.logger.fatal(msg)
2706 parser = xml.sax.make_parser()
2707 parser.setContentHandler(handler)
2710 xml.sax.parseString(api_result, handler)
2711 except SAXParseException:
2712 msg =
"ERROR: Could not parse DBS server output" 2713 self.logger.fatal(msg)
2717 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2720 datatype = handler.results.values()[0]
2723 assert len(datatype) == 1
2726 datatype = datatype[0]
2737 """Determine the number of events in a given dataset (and run). 2739 Ask DBS for the number of events in a dataset. If a run number 2740 is specified the number of events returned is that in that run 2741 of that dataset. If problems occur we throw an exception. 2744 # Since DBS does not return the number of events correctly, 2745 # neither for runs nor for whole datasets, we have to work 2746 # around that a bit... 2753 assert not self.dbs_api
is None 2757 dbs_query =
"find file.name, file.numevents where dataset = %s " \
2758 "and dataset.status = VALID" % \
2760 if not run_number
is None:
2761 dbs_query = dbq_query + (
" and run = %d" % run_number)
2763 api_result = api.executeQuery(dbs_query)
2764 except DBSAPI.dbsApiException.DbsApiException:
2765 msg =
"ERROR: Could not execute DBS query" 2766 self.logger.fatal(msg)
2770 parser = xml.sax.make_parser()
2771 parser.setContentHandler(handler)
2774 xml.sax.parseString(api_result, handler)
2775 except SAXParseException:
2776 msg =
"ERROR: Could not parse DBS server output" 2777 self.logger.fatal(msg)
2781 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 2784 num_events = sum(handler.results[
"file.numevents"])
3078 """Figure out the number of events in each run of this dataset. 3080 This is a more efficient way of doing this than calling 3081 dbs_resolve_number_of_events for each run. 3085 self.logger.debug(
"Checking spread of dataset `%s'" % dataset_name)
3089 assert not self.dbs_api
is None 3093 dbs_query =
"find run.number, site, file.name, file.numevents " \
3094 "where dataset = %s " \
3095 "and dataset.status = VALID" % \
3098 api_result = api.executeQuery(dbs_query)
3099 except DBSAPI.dbsApiException.DbsApiException:
3100 msg =
"ERROR: Could not execute DBS query" 3101 self.logger.fatal(msg)
3104 handler =
DBSXMLHandler([
"run.number",
"site",
"file.name",
"file.numevents"])
3105 parser = xml.sax.make_parser()
3106 parser.setContentHandler(handler)
3149 xml.sax.parseString(api_result, handler)
3150 except SAXParseException:
3151 msg =
"ERROR: Could not parse DBS server output" 3152 self.logger.fatal(msg)
3156 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!" 3163 for (index, site_name)
in enumerate(handler.results[
"site"]):
3173 if len(site_name) < 1:
3175 run_number =
int(handler.results[
"run.number"][index])
3176 file_name = handler.results[
"file.name"][index]
3177 nevents =
int(handler.results[
"file.numevents"][index])
3180 if run_number
not in files_info:
3182 files_info[run_number] = {}
3183 files_info[run_number][file_name] = (nevents,
3185 elif file_name
not in files_info[run_number]:
3187 files_info[run_number][file_name] = (nevents,
3194 assert nevents == files_info[run_number][file_name][0]
3196 files_info[run_number][file_name][1].
append(site_name)
3201 for run_number
in files_info.keys():
3202 files_without_sites = [i
for (i, j)
in \
3203 files_info[run_number].
items() \
3205 if len(files_without_sites) > 0:
3206 self.logger.warning(
"Removing %d file(s)" \
3207 " with empty site names" % \
3208 len(files_without_sites))
3209 for file_name
in files_without_sites:
3210 del files_info[run_number][file_name]
3216 num_events_catalog = {}
3217 for run_number
in files_info.keys():
3218 site_names =
list(set([j
for i
in files_info[run_number].
values()
for j
in i[1]]))
3224 if len(site_names) > 1:
3232 all_file_names = files_info[run_number].
keys()
3233 all_file_names = set(all_file_names)
3234 sites_with_complete_copies = []
3235 for site_name
in site_names:
3236 files_at_site = [i
for (i, (j, k)) \
3237 in files_info[run_number].
items() \
3239 files_at_site = set(files_at_site)
3240 if files_at_site == all_file_names:
3241 sites_with_complete_copies.append(site_name)
3242 if len(sites_with_complete_copies) < 1:
3248 if len(sites_with_complete_copies) > 1:
3267 self.logger.debug(
" -> run appears to be `mirrored'")
3269 self.logger.debug(
" -> run appears to be spread-out")
3272 len(sites_with_complete_copies) != len(site_names):
3276 for (file_name, (i, sites))
in files_info[run_number].
items():
3277 complete_sites = [site
for site
in sites \
3278 if site
in sites_with_complete_copies]
3279 files_info[run_number][file_name] = (i, complete_sites)
3280 site_names = sites_with_complete_copies
3282 self.logger.debug(
" for run #%d:" % run_number)
3283 num_events_catalog[run_number] = {}
3284 num_events_catalog[run_number][
"all_sites"] = sum([i[0]
for i
in files_info[run_number].
values()])
3285 if len(site_names) < 1:
3286 self.logger.debug(
" run is not available at any site")
3287 self.logger.debug(
" (but should contain %d events" % \
3288 num_events_catalog[run_number][
"all_sites"])
3290 self.logger.debug(
" at all sites combined there are %d events" % \
3291 num_events_catalog[run_number][
"all_sites"])
3292 for site_name
in site_names:
3293 num_events_catalog[run_number][site_name] = sum([i[0]
for i
in files_info[run_number].
values()
if site_name
in i[1]])
3294 self.logger.debug(
" at site `%s' there are %d events" % \
3295 (site_name, num_events_catalog[run_number][site_name]))
3296 num_events_catalog[run_number][
"mirrored"] = mirrored
3299 return num_events_catalog
3359 """Build a list of all datasets to be processed. 3368 if input_method
is None:
3370 elif input_method ==
"dataset":
3375 self.logger.info(
"Asking DBS for dataset names")
3376 dataset_names = self.dbs_resolve_dataset_name(input_name)
3377 elif input_method ==
"datasetfile":
3382 self.logger.info(
"Reading input from list file `%s'" % \
3385 listfile = open(
"/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/bin/%s" %input_name,
"r") 3386 print "open listfile" 3387 for dataset
in listfile:
3389 dataset_stripped = dataset.strip()
3390 if len(dataset_stripped) < 1:
3393 if dataset_stripped[0] !=
"#":
3394 dataset_names.extend(self. \
3398 msg =
"ERROR: Could not open input list file `%s'" % \
3400 self.logger.fatal(msg)
3405 assert False,
"Unknown input method `%s'" % input_method
3412 dataset_names =
list(set(dataset_names))
3415 dataset_names.sort()
3418 return dataset_names
3423 """Build a list of datasets to process. 3427 self.logger.info(
"Building list of datasets to consider...")
3429 input_method = self.input_method[
"datasets"][
"use"]
3430 input_name = self.input_name[
"datasets"][
"use"]
3431 dataset_names = self.build_dataset_list(input_method,
3434 [
None] * len(dataset_names))))
3436 self.logger.info(
" found %d dataset(s) to process:" % \
3438 for dataset
in dataset_names:
3439 self.logger.info(
" `%s'" % dataset)
3446 """Build a list of datasets to ignore. 3448 NOTE: We should always have a list of datasets to process, but 3449 it may be that we don't have a list of datasets to ignore. 3453 self.logger.info(
"Building list of datasets to ignore...")
3455 input_method = self.input_method[
"datasets"][
"ignore"]
3456 input_name = self.input_name[
"datasets"][
"ignore"]
3457 dataset_names = self.build_dataset_list(input_method,
3460 [
None] * len(dataset_names))))
3462 self.logger.info(
" found %d dataset(s) to ignore:" % \
3464 for dataset
in dataset_names:
3465 self.logger.info(
" `%s'" % dataset)
3477 if input_method
is None:
3479 elif input_method ==
"runs":
3482 self.logger.info(
"Reading list of runs from the " \
3484 runs.extend([
int(i.strip()) \
3485 for i
in input_name.split(
",") \
3486 if len(i.strip()) > 0])
3487 elif input_method ==
"runslistfile":
3489 self.logger.info(
"Reading list of runs from file `%s'" % \
3492 listfile = open(input_name,
"r") 3493 for run
in listfile:
3495 run_stripped = run.strip()
3496 if len(run_stripped) < 1:
3499 if run_stripped[0] !=
"#":
3500 runs.append(
int(run_stripped))
3503 msg =
"ERROR: Could not open input list file `%s'" % \
3505 self.logger.fatal(msg)
3511 assert False,
"Unknown input method `%s'" % input_method
3515 runs =
list(set(runs))
3523 """Build a list of runs to process. 3527 self.logger.info(
"Building list of runs to consider...")
3529 input_method = self.input_method[
"runs"][
"use"]
3530 input_name = self.input_name[
"runs"][
"use"]
3531 runs = self.build_runs_list(input_method, input_name)
3534 self.logger.info(
" found %d run(s) to process:" % \
3537 self.logger.info(
" %s" %
", ".
join([
str(i)
for i
in runs]))
3544 """Build a list of runs to ignore. 3546 NOTE: We should always have a list of runs to process, but 3547 it may be that we don't have a list of runs to ignore. 3551 self.logger.info(
"Building list of runs to ignore...")
3553 input_method = self.input_method[
"runs"][
"ignore"]
3554 input_name = self.input_name[
"runs"][
"ignore"]
3555 runs = self.build_runs_list(input_method, input_name)
3558 self.logger.info(
" found %d run(s) to ignore:" % \
3561 self.logger.info(
" %s" %
", ".
join([
str(i)
for i
in runs]))
3568 """Update the list of datasets taking into account the ones to 3571 Both lists have been generated before from DBS and both are 3572 assumed to be unique. 3574 NOTE: The advantage of creating the ignore list from DBS (in 3575 case a regexp is given) and matching that instead of directly 3576 matching the ignore criterion against the list of datasets (to 3577 consider) built from DBS is that in the former case we're sure 3578 that all regexps are treated exactly as DBS would have done 3579 without the cmsHarvester. 3581 NOTE: This only removes complete samples. Exclusion of single 3582 runs is done by the book keeping. So the assumption is that a 3583 user never wants to harvest just part (i.e. n out of N runs) 3588 self.logger.info(
"Processing list of datasets to ignore...")
3590 self.logger.debug(
"Before processing ignore list there are %d " \
3591 "datasets in the list to be processed" % \
3592 len(self.datasets_to_use))
3595 dataset_names_filtered = copy.deepcopy(self.datasets_to_use)
3596 for dataset_name
in self.datasets_to_use.keys():
3597 if dataset_name
in self.datasets_to_ignore.keys():
3598 del dataset_names_filtered[dataset_name]
3600 self.logger.info(
" --> Removed %d dataset(s)" % \
3601 (len(self.datasets_to_use) -
3602 len(dataset_names_filtered)))
3604 self.datasets_to_use = dataset_names_filtered
3606 self.logger.debug(
"After processing ignore list there are %d " \
3607 "datasets in the list to be processed" % \
3608 len(self.datasets_to_use))
3616 self.logger.info(
"Processing list of runs to use and ignore...")
3627 runs_to_use = self.runs_to_use
3628 runs_to_ignore = self.runs_to_ignore
3630 for dataset_name
in self.datasets_to_use:
3631 runs_in_dataset = self.datasets_information[dataset_name][
"runs"]
3634 runs_to_use_tmp = []
3635 for run
in runs_to_use:
3636 if not run
in runs_in_dataset:
3637 self.logger.warning(
"Dataset `%s' does not contain " \
3638 "requested run %d " \
3639 "--> ignoring `use' of this run" % \
3640 (dataset_name, run))
3642 runs_to_use_tmp.append(run)
3644 if len(runs_to_use) > 0:
3645 runs = runs_to_use_tmp
3646 self.logger.info(
"Using %d out of %d runs " \
3647 "of dataset `%s'" % \
3648 (len(runs), len(runs_in_dataset),
3651 runs = runs_in_dataset
3653 if len(runs_to_ignore) > 0:
3656 if not run
in runs_to_ignore:
3657 runs_tmp.append(run)
3658 self.logger.info(
"Ignoring %d out of %d runs " \
3659 "of dataset `%s'" % \
3660 (len(runs)- len(runs_tmp),
3661 len(runs_in_dataset),
3665 if self.todofile !=
"YourToDofile.txt":
3667 print "Reading runs from file /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s" %self.todofile
3668 cmd=
"grep %s /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s | cut -f5 -d' '" %(dataset_name,self.todofile)
3669 (status, output)=commands.getstatusoutput(cmd)
3672 if run_str
in output:
3673 runs_todo.append(run)
3674 self.logger.info(
"Using %d runs " \
3675 "of dataset `%s'" % \
3681 if self.Jsonfilename !=
"YourJSON.txt":
3683 self.Jsonlumi =
True 3686 self.logger.info(
"Reading runs and lumisections from file `%s'" % \
3689 Jsonfile = open(self.Jsonfilename,
"r") 3690 for names
in Jsonfile:
3691 dictNames= eval(
str(names))
3692 for key
in dictNames:
3694 Json_runs.append(intkey)
3697 msg =
"ERROR: Could not open Jsonfile `%s'" % \
3699 self.logger.fatal(msg)
3702 if run
in Json_runs:
3703 good_runs.append(run)
3704 self.logger.info(
"Using %d runs " \
3705 "of dataset `%s'" % \
3709 if (self.Jsonrunfilename !=
"YourJSON.txt")
and (self.Jsonfilename ==
"YourJSON.txt"):
3713 self.logger.info(
"Reading runs from file `%s'" % \
3714 self.Jsonrunfilename)
3716 Jsonfile = open(self.Jsonrunfilename,
"r") 3717 for names
in Jsonfile:
3718 dictNames= eval(
str(names))
3719 for key
in dictNames:
3721 Json_runs.append(intkey)
3724 msg =
"ERROR: Could not open Jsonfile `%s'" % \
3726 self.logger.fatal(msg)
3729 if run
in Json_runs:
3730 good_runs.append(run)
3731 self.logger.info(
"Using %d runs " \
3732 "of dataset `%s'" % \
3737 self.datasets_to_use[dataset_name] = runs
3744 """Remove all but the largest part of all datasets. 3746 This allows us to harvest at least part of these datasets 3747 using single-step harvesting until the two-step approach 3753 assert self.harvesting_mode ==
"single-step-allow-partial" 3756 for dataset_name
in self.datasets_to_use:
3757 for run_number
in self.datasets_information[dataset_name][
"runs"]:
3758 max_events =
max(self.datasets_information[dataset_name][
"sites"][run_number].
values())
3759 sites_with_max_events = [i[0]
for i
in self.datasets_information[dataset_name][
"sites"][run_number].
items()
if i[1] == max_events]
3760 self.logger.warning(
"Singlifying dataset `%s', " \
3762 (dataset_name, run_number))
3763 cmssw_version = self.datasets_information[dataset_name] \
3765 selected_site = self.pick_a_site(sites_with_max_events,
3769 nevents_old = self.datasets_information[dataset_name][
"num_events"][run_number]
3770 self.logger.warning(
" --> " \
3771 "only harvesting partial statistics: " \
3772 "%d out of %d events (5.1%f%%) " \
3776 100. * max_events / nevents_old,
3778 self.logger.warning(
"!!! Please note that the number of " \
3779 "events in the output path name will " \
3780 "NOT reflect the actual statistics in " \
3781 "the harvested results !!!")
3788 self.datasets_information[dataset_name][
"sites"][run_number] = {selected_site: max_events}
3789 self.datasets_information[dataset_name][
"num_events"][run_number] = max_events
3797 """Check list of dataset names for impossible ones. 3799 Two kinds of checks are done: 3800 - Checks for things that do not make sense. These lead to 3801 errors and skipped datasets. 3802 - Sanity checks. For these warnings are issued but the user is 3803 considered to be the authoritative expert. 3806 - The CMSSW version encoded in the dataset name should match 3807 self.cmssw_version. This is critical. 3808 - There should be some events in the dataset/run. This is 3809 critical in the sense that CRAB refuses to create jobs for 3810 zero events. And yes, this does happen in practice. E.g. the 3811 reprocessed CRAFT08 datasets contain runs with zero events. 3812 - A cursory check is performed to see if the harvesting type 3813 makes sense for the data type. This should prevent the user 3814 from inadvertently running RelVal for data. 3815 - It is not possible to run single-step harvesting jobs on 3816 samples that are not fully contained at a single site. 3817 - Each dataset/run has to be available at at least one site. 3821 self.logger.info(
"Performing sanity checks on dataset list...")
3823 dataset_names_after_checks = copy.deepcopy(self.datasets_to_use)
3825 for dataset_name
in self.datasets_to_use.keys():
3828 version_from_dataset = self.datasets_information[dataset_name] \
3830 if version_from_dataset != self.cmssw_version:
3831 msg =
" CMSSW version mismatch for dataset `%s' " \
3834 self.cmssw_version, version_from_dataset)
3835 if self.force_running:
3837 self.logger.warning(
"%s " \
3838 "--> `force mode' active: " \
3841 del dataset_names_after_checks[dataset_name]
3842 self.logger.warning(
"%s " \
3843 "--> skipping" % msg)
3854 datatype = self.datasets_information[dataset_name][
"datatype"]
3855 if datatype ==
"data":
3857 if self.harvesting_type !=
"DQMOffline":
3859 elif datatype ==
"mc":
3860 if self.harvesting_type ==
"DQMOffline":
3864 assert False,
"ERROR Impossible data type `%s' " \
3865 "for dataset `%s'" % \
3866 (datatype, dataset_name)
3868 msg =
" Normally one does not run `%s' harvesting " \
3869 "on %s samples, are you sure?" % \
3870 (self.harvesting_type, datatype)
3871 if self.force_running:
3872 self.logger.warning(
"%s " \
3873 "--> `force mode' active: " \
3876 del dataset_names_after_checks[dataset_name]
3877 self.logger.warning(
"%s " \
3878 "--> skipping" % msg)
3892 if datatype ==
"data":
3893 if self.globaltag
is None:
3894 msg =
"For data datasets (like `%s') " \
3895 "we need a GlobalTag" % \
3897 del dataset_names_after_checks[dataset_name]
3898 self.logger.warning(
"%s " \
3899 "--> skipping" % msg)
3909 globaltag = self.datasets_information[dataset_name][
"globaltag"]
3910 if not globaltag
in self.globaltag_check_cache:
3911 if self.check_globaltag(globaltag):
3912 self.globaltag_check_cache.append(globaltag)
3914 msg =
"Something is wrong with GlobalTag `%s' " \
3915 "used by dataset `%s'!" % \
3916 (globaltag, dataset_name)
3917 if self.use_ref_hists:
3918 msg +=
"\n(Either it does not exist or it " \
3919 "does not contain the required key to " \
3920 "be used with reference histograms.)" 3922 msg +=
"\n(It probably just does not exist.)" 3923 self.logger.fatal(msg)
3929 runs_without_sites = [i
for (i, j)
in \
3930 self.datasets_information[dataset_name] \
3933 i
in self.datasets_to_use[dataset_name]]
3934 if len(runs_without_sites) > 0:
3935 for run_without_sites
in runs_without_sites:
3937 dataset_names_after_checks[dataset_name].
remove(run_without_sites)
3940 self.logger.warning(
" removed %d unavailable run(s) " \
3941 "from dataset `%s'" % \
3942 (len(runs_without_sites), dataset_name))
3943 self.logger.debug(
" (%s)" % \
3945 runs_without_sites]))
3951 if not self.harvesting_mode ==
"two-step":
3952 for run_number
in self.datasets_to_use[dataset_name]:
3957 num_sites = len(self.datasets_information[dataset_name] \
3958 [
"sites"][run_number])
3959 if num_sites > 1
and \
3960 not self.datasets_information[dataset_name] \
3961 [
"mirrored"][run_number]:
3965 msg =
" Dataset `%s', run %d is spread across more " \
3966 "than one site.\n" \
3967 " Cannot run single-step harvesting on " \
3968 "samples spread across multiple sites" % \
3969 (dataset_name, run_number)
3971 dataset_names_after_checks[dataset_name].
remove(run_number)
3974 self.logger.warning(
"%s " \
3975 "--> skipping" % msg)
3984 tmp = [j
for (i, j)
in self.datasets_information \
3985 [dataset_name][
"num_events"].
items() \
3986 if i
in self.datasets_to_use[dataset_name]]
3987 num_events_dataset = sum(tmp)
3989 if num_events_dataset < 1:
3990 msg =
" dataset `%s' is empty" % dataset_name
3991 del dataset_names_after_checks[dataset_name]
3992 self.logger.warning(
"%s " \
3993 "--> skipping" % msg)
4006 self.datasets_information[dataset_name] \
4007 [
"num_events"].
items()
if i[1] < 1]
4008 tmp = [i
for i
in tmp
if i[0]
in self.datasets_to_use[dataset_name]]
4009 empty_runs =
dict(tmp)
4010 if len(empty_runs) > 0:
4011 for empty_run
in empty_runs:
4013 dataset_names_after_checks[dataset_name].
remove(empty_run)
4016 self.logger.info(
" removed %d empty run(s) from dataset `%s'" % \
4017 (len(empty_runs), dataset_name))
4018 self.logger.debug(
" (%s)" % \
4019 ", ".
join([
str(i)
for i
in empty_runs]))
4025 dataset_names_after_checks_tmp = copy.deepcopy(dataset_names_after_checks)
4026 for (dataset_name, runs)
in dataset_names_after_checks.iteritems():
4028 self.logger.warning(
" Removing dataset without any runs " \
4031 del dataset_names_after_checks_tmp[dataset_name]
4032 dataset_names_after_checks = dataset_names_after_checks_tmp
4036 self.logger.warning(
" --> Removed %d dataset(s)" % \
4037 (len(self.datasets_to_use) -
4038 len(dataset_names_after_checks)))
4041 self.datasets_to_use = dataset_names_after_checks
4048 """Escape a DBS dataset name. 4050 Escape a DBS dataset name such that it does not cause trouble 4051 with the file system. This means turning each `/' into `__', 4052 except for the first one which is just removed. 4056 escaped_dataset_name = dataset_name
4057 escaped_dataset_name = escaped_dataset_name.strip(
"/")
4058 escaped_dataset_name = escaped_dataset_name.replace(
"/",
"__")
4060 return escaped_dataset_name
4067 """Generate the name of the configuration file to be run by 4070 Depending on the harvesting mode (single-step or two-step) 4071 this is the name of the real harvesting configuration or the 4072 name of the first-step ME summary extraction configuration. 4076 if self.harvesting_mode ==
"single-step":
4077 config_file_name = self.create_harvesting_config_file_name(dataset_name)
4078 elif self.harvesting_mode ==
"single-step-allow-partial":
4079 config_file_name = self.create_harvesting_config_file_name(dataset_name)
4086 elif self.harvesting_mode ==
"two-step":
4087 config_file_name = self.create_me_summary_config_file_name(dataset_name)
4089 assert False,
"ERROR Unknown harvesting mode `%s'" % \
4090 self.harvesting_mode
4093 return config_file_name
4099 "Generate the name to be used for the harvesting config file." 4101 file_name_base =
"harvesting.py" 4102 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4103 config_file_name = file_name_base.replace(
".py",
4105 dataset_name_escaped)
4108 return config_file_name
4113 "Generate the name of the ME summary extraction config file." 4115 file_name_base =
"me_extraction.py" 4116 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4117 config_file_name = file_name_base.replace(
".py",
4119 dataset_name_escaped)
4122 return config_file_name
4127 """Create the name of the output file name to be used. 4129 This is the name of the output file of the `first step'. In 4130 the case of single-step harvesting this is already the final 4131 harvesting output ROOT file. In the case of two-step 4132 harvesting it is the name of the intermediary ME summary 4145 if self.harvesting_mode ==
"single-step":
4147 assert not run_number
is None 4149 output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4150 elif self.harvesting_mode ==
"single-step-allow-partial":
4152 assert not run_number
is None 4154 output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4155 elif self.harvesting_mode ==
"two-step":
4157 assert run_number
is None 4159 output_file_name = self.create_me_summary_output_file_name(dataset_name)
4162 assert False,
"ERROR Unknown harvesting mode `%s'" % \
4163 self.harvesting_mode
4166 return output_file_name
4171 """Generate the name to be used for the harvesting output file. 4173 This harvesting output file is the _final_ ROOT output file 4174 containing the harvesting results. In case of two-step 4175 harvesting there is an intermediate ME output file as well. 4179 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4187 output_file_name =
"DQM_V0001_R%09d__%s.root" % \
4188 (run_number, dataset_name_escaped)
4189 if self.harvesting_mode.find(
"partial") > -1:
4192 if self.datasets_information[dataset_name] \
4193 [
"mirrored"][run_number] ==
False:
4194 output_file_name = output_file_name.replace(
".root", \
4198 return output_file_name
4203 """Generate the name of the intermediate ME file name to be 4204 used in two-step harvesting. 4208 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4209 output_file_name =
"me_summary_%s.root" % \
4210 dataset_name_escaped
4213 return output_file_name
4218 """Create the block name to use for this dataset/run number. 4220 This is what appears in the brackets `[]' in multicrab.cfg. It 4221 is used as the name of the job and to create output 4226 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4227 block_name =
"%s_%09d_%s" % (dataset_name_escaped, run_number, index)
4235 """Create a CRAB configuration for a given job. 4237 NOTE: This is _not_ a complete (as in: submittable) CRAB 4238 configuration. It is used to store the common settings for the 4239 multicrab configuration. 4241 NOTE: Only CERN CASTOR area (/castor/cern.ch/) is supported. 4243 NOTE: According to CRAB, you `Must define exactly two of 4244 total_number_of_events, events_per_job, or 4245 number_of_jobs.'. For single-step harvesting we force one job, 4246 for the rest we don't really care. 4249 # With the current version of CRAB (2.6.1), in which Daniele 4250 # fixed the behaviour of no_block_boundary for me, one _has to 4251 # specify_ the total_number_of_events and one single site in 4252 # the se_white_list. 4260 castor_prefix = self.castor_prefix
4262 tmp.append(self.config_file_header())
4267 tmp.append(
"[CRAB]")
4268 tmp.append(
"jobtype = cmssw")
4273 tmp.append(
"[GRID]")
4274 tmp.append(
"virtual_organization=cms")
4279 tmp.append(
"[USER]")
4280 tmp.append(
"copy_data = 1")
4285 tmp.append(
"[CMSSW]")
4286 tmp.append(
"# This reveals data hosted on T1 sites,")
4287 tmp.append(
"# which is normally hidden by CRAB.")
4288 tmp.append(
"show_prod = 1")
4289 tmp.append(
"number_of_jobs = 1")
4290 if self.Jsonlumi ==
True:
4291 tmp.append(
"lumi_mask = %s" % self.Jsonfilename)
4292 tmp.append(
"total_number_of_lumis = -1")
4294 if self.harvesting_type ==
"DQMOffline":
4295 tmp.append(
"total_number_of_lumis = -1")
4297 tmp.append(
"total_number_of_events = -1")
4298 if self.harvesting_mode.find(
"single-step") > -1:
4299 tmp.append(
"# Force everything to run in one job.")
4300 tmp.append(
"no_block_boundary = 1")
4307 crab_config =
"\n".
join(tmp)
4315 """Create a multicrab.cfg file for all samples. 4317 This creates the contents for a multicrab.cfg file that uses 4318 the crab.cfg file (generated elsewhere) for the basic settings 4319 and contains blocks for each run of each dataset. 4322 # The fact that it's necessary to specify the se_white_list 4323 # and the total_number_of_events is due to our use of CRAB 4324 # version 2.6.1. This should no longer be necessary in the 4330 cmd=
"who i am | cut -f1 -d' '" 4331 (status, output)=commands.getstatusoutput(cmd)
4334 if self.caf_access ==
True:
4335 print "Extracting %s as user name" %UserName
4337 number_max_sites = self.nr_max_sites + 1
4339 multicrab_config_lines = []
4340 multicrab_config_lines.append(self.config_file_header())
4341 multicrab_config_lines.append(
"")
4342 multicrab_config_lines.append(
"[MULTICRAB]")
4343 multicrab_config_lines.append(
"cfg = crab.cfg")
4344 multicrab_config_lines.append(
"")
4346 dataset_names = self.datasets_to_use.keys()
4347 dataset_names.sort()
4349 for dataset_name
in dataset_names:
4350 runs = self.datasets_to_use[dataset_name]
4351 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4352 castor_prefix = self.castor_prefix
4357 castor_dir = self.datasets_information[dataset_name] \
4358 [
"castor_path"][run]
4360 cmd =
"rfdir %s" % castor_dir
4361 (status, output) = commands.getstatusoutput(cmd)
4363 if len(output) <= 0:
4369 assert (len(self.datasets_information[dataset_name] \
4370 [
"sites"][run]) == 1)
or \
4371 self.datasets_information[dataset_name][
"mirrored"]
4374 site_names = self.datasets_information[dataset_name] \
4375 [
"sites"][run].
keys()
4377 for i
in range(1, number_max_sites, 1):
4378 if len(site_names) > 0:
4379 index =
"site_%02d" % (i)
4381 config_file_name = self. \
4383 output_file_name = self. \
4394 if len(site_names) > 1:
4395 cmssw_version = self.datasets_information[dataset_name] \
4397 self.logger.info(
"Picking site for mirrored dataset " \
4399 (dataset_name, run))
4400 site_name = self.pick_a_site(site_names, cmssw_version)
4401 if site_name
in site_names:
4402 site_names.remove(site_name)
4405 site_name = site_names[0]
4406 site_names.remove(site_name)
4408 if site_name
is self.no_matching_site_found_str:
4412 nevents = self.datasets_information[dataset_name][
"num_events"][run]
4415 multicrab_block_name = self.create_multicrab_block_name( \
4416 dataset_name, run, index)
4417 multicrab_config_lines.append(
"[%s]" % \
4418 multicrab_block_name)
4422 if site_name ==
"caf.cern.ch":
4423 multicrab_config_lines.append(
"CRAB.use_server=0")
4424 multicrab_config_lines.append(
"CRAB.scheduler=caf")
4426 multicrab_config_lines.append(
"scheduler = glite")
4430 if site_name ==
"caf.cern.ch":
4433 multicrab_config_lines.append(
"GRID.se_white_list = %s" % \
4435 multicrab_config_lines.append(
"# This removes the default blacklisting of T1 sites.")
4436 multicrab_config_lines.append(
"GRID.remove_default_blacklist = 1")
4437 multicrab_config_lines.append(
"GRID.rb = CERN")
4438 if not self.non_t1access:
4439 multicrab_config_lines.append(
"GRID.role = t1access")
4444 castor_dir = castor_dir.replace(castor_prefix,
"")
4445 multicrab_config_lines.append(
"USER.storage_element=srm-cms.cern.ch")
4446 multicrab_config_lines.append(
"USER.user_remote_dir = %s" % \
4448 multicrab_config_lines.append(
"USER.check_user_remote_dir=0")
4450 if site_name ==
"caf.cern.ch":
4451 multicrab_config_lines.append(
"USER.storage_path=%s" % castor_prefix)
4457 multicrab_config_lines.append(
"USER.storage_path=/srm/managerv2?SFN=%s" % castor_prefix)
4464 multicrab_config_lines.append(
"CMSSW.pset = %s" % \
4466 multicrab_config_lines.append(
"CMSSW.datasetpath = %s" % \
4468 multicrab_config_lines.append(
"CMSSW.runselection = %d" % \
4471 if self.Jsonlumi ==
True:
4474 if self.harvesting_type ==
"DQMOffline":
4477 multicrab_config_lines.append(
"CMSSW.total_number_of_events = %d" % \
4480 multicrab_config_lines.append(
"CMSSW.output_file = %s" % \
4485 if site_name ==
"caf.cern.ch":
4486 multicrab_config_lines.append(
"CAF.queue=cmscaf1nd")
4490 multicrab_config_lines.append(
"")
4494 self.all_sites_found =
True 4496 multicrab_config =
"\n".
join(multicrab_config_lines)
4499 return multicrab_config
4504 """Check if globaltag exists. 4506 Check if globaltag exists as GlobalTag in the database given 4507 by self.frontier_connection_name['globaltag']. If globaltag is 4508 None, self.globaltag is used instead. 4510 If we're going to use reference histograms this method also 4511 checks for the existence of the required key in the GlobalTag. 4515 if globaltag
is None:
4516 globaltag = self.globaltag
4519 if globaltag.endswith(
"::All"):
4520 globaltag = globaltag[:-5]
4522 connect_name = self.frontier_connection_name[
"globaltag"]
4530 connect_name = connect_name.replace(
"frontier://",
4531 "frontier://cmsfrontier:8000/")
4533 connect_name += self.db_account_name_cms_cond_globaltag()
4535 tag_exists = self.check_globaltag_exists(globaltag, connect_name)
4539 tag_contains_ref_hist_key =
False 4540 if self.use_ref_hists
and tag_exists:
4542 tag_contains_ref_hist_key = self.check_globaltag_contains_ref_hist_key(globaltag, connect_name)
4546 if self.use_ref_hists:
4547 ret_val = tag_exists
and tag_contains_ref_hist_key
4549 ret_val = tag_exists
4559 """Check if globaltag exists. 4563 self.logger.info(
"Checking existence of GlobalTag `%s'" % \
4565 self.logger.debug(
" (Using database connection `%s')" % \
4568 cmd =
"cmscond_tagtree_list -c %s -T %s" % \
4569 (connect_name, globaltag)
4570 (status, output) = commands.getstatusoutput(cmd)
4572 output.find(
"error") > -1:
4573 msg =
"Could not check existence of GlobalTag `%s' in `%s'" % \
4574 (globaltag, connect_name)
4575 if output.find(
".ALL_TABLES not found") > -1:
4577 "Missing database account `%s'" % \
4578 (msg, output.split(
".ALL_TABLES")[0].
split()[-1])
4579 self.logger.fatal(msg)
4580 self.logger.debug(
"Command used:")
4581 self.logger.debug(
" %s" % cmd)
4582 self.logger.debug(
"Output received:")
4583 self.logger.debug(output)
4585 if output.find(
"does not exist") > -1:
4586 self.logger.debug(
"GlobalTag `%s' does not exist in `%s':" % \
4587 (globaltag, connect_name))
4588 self.logger.debug(
"Output received:")
4589 self.logger.debug(output)
4593 self.logger.info(
" GlobalTag exists? -> %s" % tag_exists)
4601 """Check if globaltag contains the required RefHistos key. 4606 tag_contains_key =
None 4607 ref_hist_key =
"RefHistos" 4608 self.logger.info(
"Checking existence of reference " \
4609 "histogram key `%s' in GlobalTag `%s'" % \
4610 (ref_hist_key, globaltag))
4611 self.logger.debug(
" (Using database connection `%s')" % \
4613 cmd =
"cmscond_tagtree_list -c %s -T %s -n %s" % \
4614 (connect_name, globaltag, ref_hist_key)
4615 (status, output) = commands.getstatusoutput(cmd)
4617 output.find(
"error") > -1:
4618 msg =
"Could not check existence of key `%s'" % \
4619 (ref_hist_key, connect_name)
4620 self.logger.fatal(msg)
4621 self.logger.debug(
"Command used:")
4622 self.logger.debug(
" %s" % cmd)
4623 self.logger.debug(
"Output received:")
4624 self.logger.debug(
" %s" % output)
4627 self.logger.debug(
"Required key for use of reference " \
4628 "histograms `%s' does not exist " \
4629 "in GlobalTag `%s':" % \
4630 (ref_hist_key, globaltag))
4631 self.logger.debug(
"Output received:")
4632 self.logger.debug(output)
4633 tag_contains_key =
False 4635 tag_contains_key =
True 4637 self.logger.info(
" GlobalTag contains `%s' key? -> %s" % \
4638 (ref_hist_key, tag_contains_key))
4641 return tag_contains_key
4646 """Check the existence of tag_name in database connect_name. 4648 Check if tag_name exists as a reference histogram tag in the 4649 database given by self.frontier_connection_name['refhists']. 4653 connect_name = self.frontier_connection_name[
"refhists"]
4654 connect_name += self.db_account_name_cms_cond_dqm_summary()
4656 self.logger.debug(
"Checking existence of reference " \
4657 "histogram tag `%s'" % \
4659 self.logger.debug(
" (Using database connection `%s')" % \
4662 cmd =
"cmscond_list_iov -c %s" % \
4664 (status, output) = commands.getstatusoutput(cmd)
4666 msg =
"Could not check existence of tag `%s' in `%s'" % \
4667 (tag_name, connect_name)
4668 self.logger.fatal(msg)
4669 self.logger.debug(
"Command used:")
4670 self.logger.debug(
" %s" % cmd)
4671 self.logger.debug(
"Output received:")
4672 self.logger.debug(output)
4674 if not tag_name
in output.split():
4675 self.logger.debug(
"Reference histogram tag `%s' " \
4676 "does not exist in `%s'" % \
4677 (tag_name, connect_name))
4678 self.logger.debug(
" Existing tags: `%s'" % \
4679 "', `".
join(output.split()))
4683 self.logger.debug(
" Reference histogram tag exists? " \
4684 "-> %s" % tag_exists)
4692 """Build the es_prefer snippet for the reference histograms. 4694 The building of the snippet is wrapped in some care-taking 4695 code that figures out the name of the reference histogram set 4696 and makes sure the corresponding tag exists. 4702 ref_hist_tag_name = self.ref_hist_mappings[dataset_name]
4704 connect_name = self.frontier_connection_name[
"refhists"]
4705 connect_name += self.db_account_name_cms_cond_dqm_summary()
4706 record_name =
"DQMReferenceHistogramRootFileRcd" 4710 code_lines.append(
"from CondCore.DBCommon.CondDBSetup_cfi import *")
4711 code_lines.append(
"process.ref_hist_source = cms.ESSource(\"PoolDBESSource\", CondDBSetup,")
4712 code_lines.append(
" connect = cms.string(\"%s\")," % connect_name)
4713 code_lines.append(
" toGet = cms.VPSet(cms.PSet(record = cms.string(\"%s\")," % record_name)
4714 code_lines.append(
" tag = cms.string(\"%s\"))," % ref_hist_tag_name)
4715 code_lines.append(
" )")
4716 code_lines.append(
" )")
4717 code_lines.append(
"process.es_prefer_ref_hist_source = cms.ESPrefer(\"PoolDBESSource\", \"ref_hist_source\")")
4719 snippet =
"\n".
join(code_lines)
4727 """Create the Python harvesting configuration for harvesting. 4729 The basic configuration is created by 4730 Configuration.PyReleaseValidation.ConfigBuilder. (This mimics 4731 what cmsDriver.py does.) After that we add some specials 4734 NOTE: On one hand it may not be nice to circumvent 4735 cmsDriver.py, on the other hand cmsDriver.py does not really 4736 do anything itself. All the real work is done by the 4737 ConfigBuilder so there is not much risk that we miss out on 4738 essential developments of cmsDriver in the future. 4743 config_options = defaultOptions
4748 config_options.name =
"harvesting" 4749 config_options.scenario =
"pp" 4750 config_options.number = 1
4751 config_options.arguments = self.ident_string()
4752 config_options.evt_type = config_options.name
4753 config_options.customisation_file =
None 4754 config_options.filein =
"dummy_value" 4755 config_options.filetype =
"EDM" 4757 config_options.gflash =
"dummy_value" 4761 config_options.dbsquery =
"" 4768 config_options.step =
"HARVESTING:%s" % \
4769 self.harvesting_info[self.harvesting_type] \
4771 config_options.beamspot = self.harvesting_info[self.harvesting_type] \
4773 config_options.eventcontent = self.harvesting_info \
4774 [self.harvesting_type] \
4776 config_options.harvesting = self.harvesting_info \
4777 [self.harvesting_type] \
4784 datatype = self.datasets_information[dataset_name][
"datatype"]
4785 config_options.isMC = (datatype.lower() ==
"mc")
4786 config_options.isData = (datatype.lower() ==
"data")
4787 globaltag = self.datasets_information[dataset_name][
"globaltag"]
4789 config_options.conditions = self.format_conditions_string(globaltag)
4793 if "with_input" in getargspec(ConfigBuilder.__init__)[0]:
4795 config_builder =
ConfigBuilder(config_options, with_input=
True)
4799 config_builder.prepare(
True)
4800 config_contents = config_builder.pythonCfgCode
4809 marker_lines.append(sep)
4810 marker_lines.append(
"# Code between these markers was generated by")
4811 marker_lines.append(
"# Configuration.PyReleaseValidation." \
4814 marker_lines.append(sep)
4815 marker =
"\n".
join(marker_lines)
4817 tmp = [self.config_file_header()]
4821 tmp.append(config_contents)
4825 config_contents =
"\n".
join(tmp)
4830 customisations = [
""]
4832 customisations.append(
"# Now follow some customisations")
4833 customisations.append(
"")
4834 connect_name = self.frontier_connection_name[
"globaltag"]
4835 connect_name += self.db_account_name_cms_cond_globaltag()
4836 customisations.append(
"process.GlobalTag.connect = \"%s\"" % \
4840 if self.saveByLumiSection ==
True:
4841 customisations.append(
"process.dqmSaver.saveByLumiSection = 1")
4845 customisations.append(
"")
4859 use_es_prefer = (self.harvesting_type ==
"RelVal")
4860 use_refs = use_es_prefer
or \
4861 (
not self.harvesting_type ==
"MC")
4863 use_refs = use_refs
and self.use_ref_hists
4871 customisations.append(
"print \"Not using reference histograms\"")
4872 customisations.append(
"if hasattr(process, \"dqmRefHistoRootFileGetter\"):")
4873 customisations.append(
" for (sequence_name, sequence) in process.sequences.iteritems():")
4874 customisations.append(
" if sequence.remove(process.dqmRefHistoRootFileGetter):")
4875 customisations.append(
" print \"Removed process.dqmRefHistoRootFileGetter from sequence `%s'\" % \\")
4876 customisations.append(
" sequence_name")
4877 customisations.append(
"process.dqmSaver.referenceHandling = \"skip\"")
4881 customisations.append(
"process.dqmSaver.referenceHandling = \"all\"")
4883 es_prefer_snippet = self.create_es_prefer_snippet(dataset_name)
4884 customisations.append(es_prefer_snippet)
4888 workflow_name = dataset_name
4889 if self.harvesting_mode ==
"single-step-allow-partial":
4890 workflow_name +=
"_partial" 4891 customisations.append(
"process.dqmSaver.workflow = \"%s\"" % \
4928 config_contents = config_contents +
"\n".
join(customisations)
4933 return config_contents
4960 tmp.append(self.config_file_header())
4962 tmp.append(
"import FWCore.ParameterSet.Config as cms")
4964 tmp.append(
"process = cms.Process(\"ME2EDM\")")
4966 tmp.append(
"# Import of standard configurations")
4967 tmp.append(
"process.load(\"Configuration/EventContent/EventContent_cff\")")
4969 tmp.append(
"# We don't really process any events, just keep this set to one to")
4970 tmp.append(
"# make sure things work.")
4971 tmp.append(
"process.maxEvents = cms.untracked.PSet(")
4972 tmp.append(
" input = cms.untracked.int32(1)")
4975 tmp.append(
"process.options = cms.untracked.PSet(")
4976 tmp.append(
" Rethrow = cms.untracked.vstring(\"ProductNotFound\")")
4979 tmp.append(
"process.source = cms.Source(\"PoolSource\",")
4980 tmp.append(
" processingMode = \\")
4981 tmp.append(
" cms.untracked.string(\"RunsAndLumis\"),")
4982 tmp.append(
" fileNames = \\")
4983 tmp.append(
" cms.untracked.vstring(\"no_file_specified\")")
4986 tmp.append(
"# Output definition: drop everything except for the monitoring.")
4987 tmp.append(
"process.output = cms.OutputModule(")
4988 tmp.append(
" \"PoolOutputModule\",")
4989 tmp.append(
" outputCommands = \\")
4990 tmp.append(
" cms.untracked.vstring(\"drop *\", \\")
4991 tmp.append(
" \"keep *_MEtoEDMConverter_*_*\"),")
4992 output_file_name = self. \
4994 tmp.append(
" fileName = \\")
4995 tmp.append(
" cms.untracked.string(\"%s\")," % output_file_name)
4996 tmp.append(
" dataset = cms.untracked.PSet(")
4997 tmp.append(
" dataTier = cms.untracked.string(\"RECO\"),")
4998 tmp.append(
" filterName = cms.untracked.string(\"\")")
5002 tmp.append(
"# Additional output definition")
5003 tmp.append(
"process.out_step = cms.EndPath(process.output)")
5005 tmp.append(
"# Schedule definition")
5006 tmp.append(
"process.schedule = cms.Schedule(process.out_step)")
5009 config_contents =
"\n".
join(tmp)
5012 return config_contents
5050 """Write a CRAB job configuration Python file. 5054 self.logger.info(
"Writing CRAB configuration...")
5056 file_name_base =
"crab.cfg" 5059 crab_contents = self.create_crab_config()
5062 crab_file_name = file_name_base
5064 crab_file =
file(crab_file_name,
"w")
5065 crab_file.write(crab_contents)
5068 self.logger.fatal(
"Could not write " \
5069 "CRAB configuration to file `%s'" % \
5071 raise Error(
"ERROR: Could not write to file `%s'!" % \
5079 """Write a multi-CRAB job configuration Python file. 5083 self.logger.info(
"Writing multi-CRAB configuration...")
5085 file_name_base =
"multicrab.cfg" 5088 multicrab_contents = self.create_multicrab_config()
5091 multicrab_file_name = file_name_base
5093 multicrab_file =
file(multicrab_file_name,
"w")
5094 multicrab_file.write(multicrab_contents)
5095 multicrab_file.close()
5097 self.logger.fatal(
"Could not write " \
5098 "multi-CRAB configuration to file `%s'" % \
5099 multicrab_file_name)
5100 raise Error(
"ERROR: Could not write to file `%s'!" % \
5101 multicrab_file_name)
5108 """Write a harvesting job configuration Python file. 5110 NOTE: This knows nothing about single-step or two-step 5111 harvesting. That's all taken care of by 5112 create_harvesting_config. 5116 self.logger.debug(
"Writing harvesting configuration for `%s'..." % \
5120 config_contents = self.create_harvesting_config(dataset_name)
5123 config_file_name = self. \
5126 config_file =
file(config_file_name,
"w")
5127 config_file.write(config_contents)
5130 self.logger.fatal(
"Could not write " \
5131 "harvesting configuration to file `%s'" % \
5133 raise Error(
"ERROR: Could not write to file `%s'!" % \
5141 """Write an ME-extraction configuration Python file. 5143 This `ME-extraction' (ME = Monitoring Element) is the first 5144 step of the two-step harvesting. 5148 self.logger.debug(
"Writing ME-extraction configuration for `%s'..." % \
5152 config_contents = self.create_me_extraction_config(dataset_name)
5155 config_file_name = self. \
5158 config_file =
file(config_file_name,
"w")
5159 config_file.write(config_contents)
5162 self.logger.fatal(
"Could not write " \
5163 "ME-extraction configuration to file `%s'" % \
5165 raise Error(
"ERROR: Could not write to file `%s'!" % \
5174 """Check if we need to load and check the reference mappings. 5176 For data the reference histograms should be taken 5177 automatically from the GlobalTag, so we don't need any 5178 mappings. For RelVals we need to know a mapping to be used in 5179 the es_prefer code snippet (different references for each of 5182 WARNING: This implementation is a bit convoluted. 5188 if not dataset_name
is None:
5189 data_type = self.datasets_information[dataset_name] \
5191 mappings_needed = (data_type ==
"mc")
5193 if not mappings_needed:
5194 assert data_type ==
"data" 5197 tmp = [self.ref_hist_mappings_needed(dataset_name) \
5198 for dataset_name
in \
5199 self.datasets_information.keys()]
5200 mappings_needed = (
True in tmp)
5203 return mappings_needed
5208 """Load the reference histogram mappings from file. 5210 The dataset name to reference histogram name mappings are read 5211 from a text file specified in self.ref_hist_mappings_file_name. 5216 assert len(self.ref_hist_mappings) < 1, \
5217 "ERROR Should not be RE-loading " \
5218 "reference histogram mappings!" 5221 self.logger.info(
"Loading reference histogram mappings " \
5222 "from file `%s'" % \
5223 self.ref_hist_mappings_file_name)
5225 mappings_lines =
None 5227 mappings_file =
file(self.ref_hist_mappings_file_name,
"r") 5228 mappings_lines = mappings_file.readlines() 5229 mappings_file.close() 5231 msg =
"ERROR: Could not open reference histogram mapping "\
5232 "file `%s'" % self.ref_hist_mappings_file_name
5233 self.logger.fatal(msg)
5243 for mapping
in mappings_lines:
5245 if not mapping.startswith(
"#"):
5246 mapping = mapping.strip()
5247 if len(mapping) > 0:
5248 mapping_pieces = mapping.split()
5249 if len(mapping_pieces) != 2:
5250 msg =
"ERROR: The reference histogram mapping " \
5251 "file contains a line I don't " \
5252 "understand:\n %s" % mapping
5253 self.logger.fatal(msg)
5255 dataset_name = mapping_pieces[0].
strip()
5256 ref_hist_name = mapping_pieces[1].
strip()
5260 if dataset_name
in self.ref_hist_mappings:
5261 msg =
"ERROR: The reference histogram mapping " \
5262 "file contains multiple mappings for " \
5264 self.logger.fatal(msg)
5268 self.ref_hist_mappings[dataset_name] = ref_hist_name
5272 self.logger.info(
" Successfully loaded %d mapping(s)" % \
5273 len(self.ref_hist_mappings))
5274 max_len =
max([len(i)
for i
in self.ref_hist_mappings.keys()])
5275 for (map_from, map_to)
in self.ref_hist_mappings.iteritems():
5276 self.logger.info(
" %-*s -> %s" % \
5277 (max_len, map_from, map_to))
5284 """Make sure all necessary reference histograms exist. 5286 Check that for each of the datasets to be processed a 5287 reference histogram is specified and that that histogram 5288 exists in the database. 5290 NOTE: There's a little complication here. Since this whole 5291 thing was designed to allow (in principle) harvesting of both 5292 data and MC datasets in one go, we need to be careful to check 5293 the availability fof reference mappings only for those 5294 datasets that need it. 5298 self.logger.info(
"Checking reference histogram mappings")
5300 for dataset_name
in self.datasets_to_use:
5302 ref_hist_name = self.ref_hist_mappings[dataset_name]
5304 msg =
"ERROR: No reference histogram mapping found " \
5305 "for dataset `%s'" % \
5307 self.logger.fatal(msg)
5310 if not self.check_ref_hist_tag(ref_hist_name):
5311 msg =
"Reference histogram tag `%s' " \
5312 "(used for dataset `%s') does not exist!" % \
5313 (ref_hist_name, dataset_name)
5314 self.logger.fatal(msg)
5317 self.logger.info(
" Done checking reference histogram mappings.")
5324 """Obtain all information on the datasets that we need to run. 5326 Use DBS to figure out all required information on our 5327 datasets, like the run numbers and the GlobalTag. All 5328 information is stored in the datasets_information member 5343 self.datasets_information = {}
5344 self.logger.info(
"Collecting information for all datasets to process")
5345 dataset_names = self.datasets_to_use.keys()
5346 dataset_names.sort()
5347 for dataset_name
in dataset_names:
5351 self.logger.info(sep_line)
5352 self.logger.info(
" `%s'" % dataset_name)
5353 self.logger.info(sep_line)
5355 runs = self.dbs_resolve_runs(dataset_name)
5356 self.logger.info(
" found %d run(s)" % len(runs))
5358 self.logger.debug(
" run number(s): %s" % \
5359 ", ".
join([
str(i)
for i
in runs]))
5363 self.logger.warning(
" --> skipping dataset " 5365 assert False,
"Panic: found a dataset without runs " \
5369 cmssw_version = self.dbs_resolve_cmssw_version(dataset_name)
5370 self.logger.info(
" found CMSSW version `%s'" % cmssw_version)
5373 datatype = self.dbs_resolve_datatype(dataset_name)
5374 self.logger.info(
" sample is data or MC? --> %s" % \
5380 if self.globaltag
is None:
5381 globaltag = self.dbs_resolve_globaltag(dataset_name)
5383 globaltag = self.globaltag
5385 self.logger.info(
" found GlobalTag `%s'" % globaltag)
5391 assert datatype ==
"data", \
5392 "ERROR Empty GlobalTag for MC dataset!!!" 5400 sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5404 for run_number
in sites_catalog.keys():
5405 num_events[run_number] = sites_catalog \
5406 [run_number][
"all_sites"]
5407 del sites_catalog[run_number][
"all_sites"]
5412 for run_number
in sites_catalog.keys():
5413 mirror_catalog[run_number] = sites_catalog \
5414 [run_number][
"mirrored"]
5415 del sites_catalog[run_number][
"mirrored"]
5444 self.datasets_information[dataset_name] = {}
5445 self.datasets_information[dataset_name][
"runs"] = runs
5446 self.datasets_information[dataset_name][
"cmssw_version"] = \
5448 self.datasets_information[dataset_name][
"globaltag"] = globaltag
5449 self.datasets_information[dataset_name][
"datatype"] = datatype
5450 self.datasets_information[dataset_name][
"num_events"] = num_events
5451 self.datasets_information[dataset_name][
"mirrored"] = mirror_catalog
5452 self.datasets_information[dataset_name][
"sites"] = sites_catalog
5456 castor_path_common = self.create_castor_path_name_common(dataset_name)
5457 self.logger.info(
" output will go into `%s'" % \
5461 [self.create_castor_path_name_special(dataset_name, i, castor_path_common) \
5463 for path_name
in castor_paths.values():
5464 self.logger.debug(
" %s" % path_name)
5465 self.datasets_information[dataset_name][
"castor_path"] = \
5473 """Tell the user what to do now, after this part is done. 5475 This should provide the user with some (preferably 5476 copy-pasteable) instructions on what to do now with the setups 5477 and files that have been created. 5487 self.logger.info(
"")
5488 self.logger.info(sep_line)
5489 self.logger.info(
" Configuration files have been created.")
5490 self.logger.info(
" From here on please follow the usual CRAB instructions.")
5491 self.logger.info(
" Quick copy-paste instructions are shown below.")
5492 self.logger.info(sep_line)
5494 self.logger.info(
"")
5495 self.logger.info(
" Create all CRAB jobs:")
5496 self.logger.info(
" multicrab -create")
5497 self.logger.info(
"")
5498 self.logger.info(
" Submit all CRAB jobs:")
5499 self.logger.info(
" multicrab -submit")
5500 self.logger.info(
"")
5501 self.logger.info(
" Check CRAB status:")
5502 self.logger.info(
" multicrab -status")
5503 self.logger.info(
"")
5505 self.logger.info(
"")
5506 self.logger.info(
" For more information please see the CMS Twiki:")
5507 self.logger.info(
" %s" % twiki_url)
5508 self.logger.info(sep_line)
5512 if not self.all_sites_found:
5513 self.logger.warning(
" For some of the jobs no matching " \
5514 "site could be found")
5515 self.logger.warning(
" --> please scan your multicrab.cfg" \
5516 "for occurrences of `%s'." % \
5517 self.no_matching_site_found_str)
5518 self.logger.warning(
" You will have to fix those " \
5526 "Main entry point of the CMS harvester." 5536 self.parse_cmd_line_options()
5538 self.check_input_status()
5551 self.setup_harvesting_info()
5554 self.build_dataset_use_list()
5556 self.build_dataset_ignore_list()
5559 self.build_runs_use_list()
5560 self.build_runs_ignore_list()
5567 self.process_dataset_ignore_list()
5571 self.build_datasets_information()
5573 if self.use_ref_hists
and \
5574 self.ref_hist_mappings_needed():
5577 self.load_ref_hist_mappings()
5581 self.check_ref_hist_mappings()
5583 self.logger.info(
"No need to load reference " \
5584 "histogram mappings file")
5599 self.process_runs_use_and_ignore_lists()
5604 if self.harvesting_mode ==
"single-step-allow-partial":
5605 self.singlify_datasets()
5608 self.check_dataset_list()
5610 if len(self.datasets_to_use) < 1:
5611 self.logger.info(
"After all checks etc. " \
5612 "there are no datasets (left?) " \
5616 self.logger.info(
"After all checks etc. we are left " \
5617 "with %d dataset(s) to process " \
5618 "for a total of %d runs" % \
5619 (len(self.datasets_to_use),
5620 sum([len(i)
for i
in \
5621 self.datasets_to_use.values()])))
5648 self.create_and_check_castor_dirs()
5652 self.write_crab_config()
5653 self.write_multicrab_config()
5663 for dataset_name
in self.datasets_to_use.keys():
5665 self.write_harvesting_config(dataset_name)
5666 if self.harvesting_mode ==
"two-step":
5667 self.write_me_extraction_config(dataset_name)
5675 for run_number
in self.datasets_to_use[dataset_name]:
5676 tmp[run_number] = self.datasets_information \
5677 [dataset_name][
"num_events"][run_number]
5678 if dataset_name
in self.book_keeping_information:
5679 self.book_keeping_information[dataset_name].
update(tmp)
5681 self.book_keeping_information[dataset_name] = tmp
5684 self.show_exit_message()
5686 except Usage
as err:
5691 except Error
as err:
5695 except Exception
as err:
5704 if isinstance(err, SystemExit):
5705 self.logger.fatal(err.code)
5706 elif not isinstance(err, KeyboardInterrupt):
5707 self.logger.fatal(
"!" * 50)
5708 self.logger.fatal(
" This looks like a serious problem.")
5709 self.logger.fatal(
" If you are sure you followed all " \
5711 self.logger.fatal(
" please copy the below stack trace together")
5712 self.logger.fatal(
" with a description of what you were doing to")
5713 self.logger.fatal(
" jeroen.hegeman@cern.ch.")
5714 self.logger.fatal(
" %s" % self.ident_string())
5715 self.logger.fatal(
"!" * 50)
5716 self.logger.fatal(
str(err))
5718 traceback_string = traceback.format_exc()
5719 for line
in traceback_string.split(
"\n"):
5720 self.logger.fatal(line)
5721 self.logger.fatal(
"!" * 50)
5736 if self.crab_submission ==
True:
5737 os.system(
"multicrab -create")
5738 os.system(
"multicrab -submit")
5749 if __name__ ==
"__main__":
5750 "Main entry point for harvesting." def create_crab_config(self)
def option_handler_harvesting_mode(self, option, opt_str, value, parser)
def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser)
def write_harvesting_config(self, dataset_name)
def option_handler_saveByLumiSection(self, option, opt_str, value, parser)
def option_handler_sites(self, option, opt_str, value, parser)
def write_me_extraction_config(self, dataset_name)
def process_runs_use_and_ignore_lists(self)
def build_dataset_ignore_list(self)
def escape_dataset_name(self, dataset_name)
if self.datasets_information[dataset_name]["num_events"][run_number] != 0: pdb.set_trace() DEBUG DEBU...
def startElement(self, name, attrs)
def singlify_datasets(self)
def option_handler_castor_dir(self, option, opt_str, value, parser)
def option_handler_dataset_name(self, option, opt_str, value, parser): """Specify the name(s) of the ...
def option_handler_force(self, option, opt_str, value, parser)
def option_handler_caf_access(self, option, opt_str, value, parser)
def option_handler_frontier_connection(self, option, opt_str, value, parser)
def option_handler_crab_submission(self, option, opt_str, value, parser)
def create_harvesting_config(self, dataset_name)
def create_castor_path_name_special(self, dataset_name, run_number, castor_path_common)
def option_handler_harvesting_type(self, option, opt_str, value, parser)
def write_crab_config(self)
def create_harvesting_config(self, dataset_name): """Create the Python harvesting configuration for a...
def replace(string, replacements)
def write_multicrab_config(self)
def build_runs_ignore_list(self)
def current_element(self)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
def option_handler_preferred_site(self, option, opt_str, value, parser)
def option_handler_quiet(self, option, opt_str, value, parser)
frontier_connection_overridden
def create_castor_path_name_common(self, dataset_name)
def show_exit_message(self)
DEBUG DEBUG DEBUGThis is probably only useful to make sure we don't muckthings up, right?Figure out across how many sites this sample has been spread.
Helper class: Error exception.
def check_globaltag(self, globaltag=None)
CRAB
def ref_hist_mappings_needed(self, dataset_name=None)
def dbs_resolve_runs(self, dataset_name)
def dbs_resolve_dataset_number_of_events(self, dataset_name): """Ask DBS across how many events this ...
Helper class: Usage exception.
def option_handler_no_ref_hists(self, option, opt_str, value, parser)
def create_me_extraction_config(self, dataset_name)
In case this file is the second step (the real harvestingstep) of the two-step harvesting we have to ...
def create_me_summary_config_file_name(self, dataset_name)
def config_file_header(self)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def build_datasets_information(self)
def check_globaltag_exists(self, globaltag, connect_name)
def check_ref_hist_mappings(self)
def check_ref_hist_tag(self, tag_name)
def dbs_resolve_datatype(self, dataset_name)
def create_multicrab_block_name(self, dataset_name, run_number, index)
def __init__(self, cmd_line_opts=None)
def __init__(self, tag_names)
def setup_harvesting_info(self)
Helper class: DBSXMLHandler.
def option_handler_list_types(self, option, opt_str, value, parser)
def create_output_file_name(self, dataset_name, run_number=None)
def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser)
def option_handler_book_keeping_file(self, option, opt_str, value, parser)
def pick_a_site(self, sites, cmssw_version)
self.logger.debug("Checking CASTOR path piece `%s'" % \ piece)
def create_and_check_castor_dir(self, castor_dir)
def dbs_resolve_cmssw_version(self, dataset_name)
def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name)
def dbs_resolve_number_of_events(self, dataset_name, run_number=None)
def set_output_level(self, output_level)
static std::string join(char **cmd)
def load_ref_hist_mappings(self)
def parse_cmd_line_options(self)
def option_handler_input_Jsonfile(self, option, opt_str, value, parser)
def option_handler_input_todofile(self, option, opt_str, value, parser)
def check_results_validity(self)
def option_handler_input_spec(self, option, opt_str, value, parser)
def create_es_prefer_snippet(self, dataset_name)
def option_handler_debug(self, option, opt_str, value, parser)
def remove(d, key, TELL=False)
def dbs_resolve_globaltag(self, dataset_name)
def format_conditions_string(self, globaltag)
def build_runs_list(self, input_method, input_name)
def check_dataset_list(self)
def create_harvesting_config_file_name(self, dataset_name)
Only add the alarming piece to the file name if this isa spread-out dataset.
def dbs_check_dataset_spread(self, dataset_name)
def dbs_resolve_dataset_number_of_sites(self, dataset_name): """Ask DBS across how many sites this da...
def build_dataset_use_list(self)
def create_me_summary_output_file_name(self, dataset_name)
def characters(self, content)
def db_account_name_cms_cond_globaltag(self)
def option_handler_no_t1access(self, option, opt_str, value, parser)
def option_handler_globaltag(self, option, opt_str, value, parser)
def db_account_name_cms_cond_dqm_summary(self)
def check_input_status(self)
def dbs_resolve_dataset_name(self, dataset_name)
def create_config_file_name(self, dataset_name, run_number)
def setup_dbs(self)
Now we try to do a very simple DBS search.
def build_runs_use_list(self)
def create_multicrab_config(self)
CRAB
def create_and_check_castor_dirs(self)
def create_harvesting_output_file_name(self, dataset_name, run_number)
def endElement(self, name)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
def process_dataset_ignore_list(self)
ref_hist_mappings_file_name
def build_dataset_list(self, input_method, input_name)
class Handler(xml.sax.handler.ContentHandler): def startElement(self, name, attrs): if name == "resul...