15 """Main program to run all kinds of harvesting.
17 These are the basic kinds of harvesting implemented (contact me if
18 your favourite is missing):
20 - RelVal : Run for release validation samples. Makes heavy use of MC
23 - RelValFS: FastSim RelVal.
25 - MC : Run for MC samples.
27 - DQMOffline : Run for real data (could also be run for MC).
29 For the mappings of these harvesting types to sequence names please
30 see the setup_harvesting_info() and option_handler_list_types()
34 from __future__
import print_function
38 from builtins
import range
39 __version__ =
"3.8.2p1"
40 __author__ =
"Jeroen Hegeman (jeroen.hegeman@cern.ch)," \
41 "Niklas Pietsch (niklas.pietsch@desy.de)"
43 twiki_url =
"https://twiki.cern.ch/twiki/bin/view/CMS/CmsHarvester"
99 from inspect
import getargspec
100 from random
import choice
105 from DBSAPI.dbsApi
import DbsApi
108 from functools
import reduce
111 global SAXParseException
113 from xml.sax
import SAXParseException
115 import Configuration.PyReleaseValidation
116 from Configuration.PyReleaseValidation.ConfigBuilder
import \
117 ConfigBuilder, defaultOptions
136 return repr(self.
msg)
147 return repr(self.
msg)
154 """Helper class to add some customised help output to cmsHarvester.
156 We want to add some instructions, as well as a pointer to the CMS
166 usage_lines.append(sep_line)
167 usage_lines.append(
"Welcome to the CMS harvester, a (hopefully useful)")
168 usage_lines.append(
"tool to create harvesting configurations.")
169 usage_lines.append(
"For more information please have a look at the CMS Twiki:")
170 usage_lines.append(
" %s" % twiki_url)
171 usage_lines.append(sep_line)
172 usage_lines.append(
"")
176 usage_lines.append(optparse.IndentedHelpFormatter. \
179 formatted_usage =
"\n".
join(usage_lines)
180 return formatted_usage
189 """XML handler class to parse DBS results.
191 The tricky thing here is that older DBS versions (2.0.5 and
192 earlier) return results in a different XML format than newer
193 versions. Previously the result values were returned as attributes
194 to the `result' element. The new approach returns result values as
195 contents of named elements.
197 The old approach is handled directly in startElement(), the new
198 approach in characters().
200 NOTE: All results are returned in the form of string values of
211 "dataset.tag" :
"PROCESSEDDATASET_GLOBALTAG",
212 "datatype.type" :
"PRIMARYDSTYPE_TYPE",
213 "run" :
"RUNS_RUNNUMBER",
214 "run.number" :
"RUNS_RUNNUMBER",
215 "file.name" :
"FILES_LOGICALFILENAME",
216 "file.numevents" :
"FILES_NUMBEROFEVENTS",
217 "algo.version" :
"APPVERSION_VERSION",
218 "site" :
"STORAGEELEMENT_SENAME",
238 key = DBSXMLHandler.mapping[name]
239 value =
str(attrs[key])
249 "closing unopenend element `%s'" % name
272 """Make sure that all results arrays have equal length.
274 We should have received complete rows from DBS. I.e. all
275 results arrays in the handler should be of equal length.
282 if len(res_names) > 1:
283 for res_name
in res_names[1:]:
284 res_tmp = self.
results[res_name]
285 if len(res_tmp) != len(self.
results[res_names[0]]):
286 results_valid =
False
297 """Class to perform CMS harvesting.
299 More documentation `obviously' to follow.
306 "Initialize class and process command line options."
333 "single-step-allow-partial",
426 "dqm/offline/harvesting_output/"
486 if cmd_line_opts
is None:
487 cmd_line_opts = sys.argv[1:]
491 log_handler = logging.StreamHandler()
494 log_formatter = logging.Formatter(
"%(message)s")
495 log_handler.setFormatter(log_formatter)
496 logger = logging.getLogger()
498 logger.addHandler(log_handler)
510 "Clean up after ourselves."
522 "Create a timestamp to use in the created config files."
524 time_now = datetime.datetime.utcnow()
526 time_now = time_now.replace(microsecond = 0)
527 time_stamp =
"%sUTC" % datetime.datetime.isoformat(time_now)
535 "Spit out an identification string for cmsHarvester.py."
537 ident_str =
"`cmsHarvester.py " \
538 "version %s': cmsHarvester.py %s" % \
540 reduce(
lambda x, y: x+
' '+y, sys.argv[1:]))
547 """Create the conditions string needed for `cmsDriver'.
549 Just glueing the FrontierConditions bit in front of it really.
560 if globaltag.lower().
find(
"conditions") > -1:
561 conditions_string = globaltag
563 conditions_string =
"FrontierConditions_GlobalTag,%s" % \
567 return conditions_string
572 """Return the database account name used to store the GlobalTag.
574 The name of the database account depends (albeit weakly) on
575 the CMSSW release version.
581 account_name =
"CMS_COND_31X_GLOBALTAG"
589 """See db_account_name_cms_cond_globaltag."""
592 version = self.cmssw_version[6:11]
593 if version <
"3_4_0":
594 account_name =
"CMS_COND_31X_DQM_SUMMARY"
596 account_name =
"CMS_COND_34X"
604 "Create a nice header to be used to mark the generated files."
610 tmp.append(
"# %s" % time_stamp)
611 tmp.append(
"# WARNING: This file was created automatically!")
613 tmp.append(
"# Created by %s" % ident_str)
615 header =
"\n".
join(tmp)
623 """Adjust the level of output generated.
626 - normal : default level of output
627 - quiet : less output than the default
628 - verbose : some additional information
629 - debug : lots more information, may be overwhelming
631 NOTE: The debug option is a bit special in the sense that it
632 also modifies the output format.
639 "NORMAL" : logging.INFO,
640 "QUIET" : logging.WARNING,
641 "VERBOSE" : logging.INFO,
642 "DEBUG" : logging.DEBUG
645 output_level = output_level.upper()
653 self.
logger.fatal(
"Unknown output level `%s'" % ouput_level)
662 """Switch to debug mode.
664 This both increases the amount of output generated, as well as
665 changes the format used (more detailed information is given).
670 log_formatter_debug = logging.Formatter(
"[%(levelname)s] " \
680 log_handler = self.
logger.handlers[0]
681 log_handler.setFormatter(log_formatter_debug)
689 "Switch to quiet mode: less verbose."
698 """Switch on `force mode' in which case we don't brake for nobody.
700 In so-called `force mode' all sanity checks are performed but
701 we don't halt on failure. Of course this requires some care
714 """Set the harvesting type to be used.
716 This checks that no harvesting type is already set, and sets
717 the harvesting type to be used to the one specified. If a
718 harvesting type is already set an exception is thrown. The
719 same happens when an unknown type is specified.
728 value = value.lower()
731 type_index = harvesting_types_lowered.index(value)
735 self.
logger.fatal(
"Unknown harvesting type `%s'" % \
737 self.
logger.fatal(
" possible types are: %s" %
739 raise Usage(
"Unknown harvesting type `%s'" % \
745 msg =
"Only one harvesting type should be specified"
750 self.
logger.
info(
"Harvesting type to be used: `%s'" % \
758 """Set the harvesting mode to be used.
760 Single-step harvesting can be used for samples that are
761 located completely at a single site (= SE). Otherwise use
767 harvesting_mode = value.lower()
769 msg =
"Unknown harvesting mode `%s'" % harvesting_mode
771 self.
logger.fatal(
" possible modes are: %s" % \
778 msg =
"Only one harvesting mode should be specified"
783 self.
logger.
info(
"Harvesting mode to be used: `%s'" % \
791 """Set the GlobalTag to be used, overriding our own choices.
793 By default the cmsHarvester will use the GlobalTag with which
794 a given dataset was created also for the harvesting. The
795 --globaltag option is the way to override this behaviour.
801 msg =
"Only one GlobalTag should be specified"
806 self.
logger.
info(
"GlobalTag to be used: `%s'" % \
814 "Switch use of all reference histograms off."
818 self.
logger.
warning(
"Switching off all use of reference histograms")
826 """Override the default Frontier connection string.
828 Please only use this for testing (e.g. when a test payload has
829 been inserted into cms_orc_off instead of cms_orc_on).
831 This method gets called for three different command line
833 - --frontier-connection,
834 - --frontier-connection-for-globaltag,
835 - --frontier-connection-for-refhists.
836 Appropriate care has to be taken to make sure things are only
842 frontier_type = opt_str.split(
"-")[-1]
843 if frontier_type ==
"connection":
847 frontier_types = [frontier_type]
851 for connection_name
in frontier_types:
853 msg =
"Please specify either:\n" \
854 " `--frontier-connection' to change the " \
855 "Frontier connection used for everything, or\n" \
856 "either one or both of\n" \
857 " `--frontier-connection-for-globaltag' to " \
858 "change the Frontier connection used for the " \
860 " `--frontier-connection-for-refhists' to change " \
861 "the Frontier connection used for the " \
862 "reference histograms."
866 frontier_prefix =
"frontier://"
867 if not value.startswith(frontier_prefix):
868 msg =
"Expecting Frontier connections to start with " \
869 "`%s'. You specified `%s'." % \
870 (frontier_prefix, value)
875 if value.find(
"FrontierProd") < 0
and \
876 value.find(
"FrontierProd") < 0:
877 msg =
"Expecting Frontier connections to contain either " \
878 "`FrontierProd' or `FrontierPrep'. You specified " \
879 "`%s'. Are you sure?" % \
883 if not value.endswith(
"/"):
886 for connection_name
in frontier_types:
890 frontier_type_str =
"unknown"
891 if connection_name ==
"globaltag":
892 frontier_type_str =
"the GlobalTag"
893 elif connection_name ==
"refhists":
894 frontier_type_str =
"the reference histograms"
897 "connection for %s " \
935 if opt_str.lower().
find(
"ignore") > -1:
941 if opt_str.lower().
find(
"dataset") > -1:
942 select_type =
"datasets"
946 if not self.
input_method[select_type][spec_type]
is None:
947 msg =
"Please only specify one input method " \
948 "(for the `%s' case)" % opt_str
952 input_method = opt_str.replace(
"-",
"").
replace(
"ignore",
"")
953 self.
input_method[select_type][spec_type] = input_method
954 self.
input_name[select_type][spec_type] = value
956 self.
logger.
debug(
"Input method for the `%s' case: %s" % \
957 (spec_type, input_method))
964 """Store the name of the file to be used for book keeping.
966 The only check done here is that only a single book keeping
974 msg =
"Only one book keeping file should be specified"
979 self.
logger.
info(
"Book keeping file to be used: `%s'" % \
987 """Store the name of the file for the ref. histogram mapping.
994 msg =
"Only one reference histogram mapping file " \
995 "should be specified"
1000 self.
logger.
info(
"Reference histogram mapping file " \
1001 "to be used: `%s'" % \
1063 """Specify where on CASTOR the output should go.
1065 At the moment only output to CERN CASTOR is
1066 supported. Eventually the harvested results should go into the
1067 central place for DQM on CASTOR anyway.
1074 castor_prefix = self.castor_prefix
1077 castor_dir = os.path.join(os.path.sep, castor_dir)
1078 self.castor_base_dir = os.path.normpath(castor_dir)
1080 self.logger.
info(
"CASTOR (base) area to be used: `%s'" % \
1081 self.castor_base_dir)
1088 """Set the self.no_t1access flag to try and create jobs that
1089 run without special `t1access' role.
1093 self.non_t1access =
True
1095 self.logger.
warning(
"Running in `non-t1access' mode. " \
1096 "Will try to create jobs that run " \
1097 "without special rights but no " \
1098 "further promises...")
1105 """Set the self.caf_access flag to try and create jobs that
1109 self.caf_access =
True
1111 self.logger.
warning(
"Running in `caf_access' mode. " \
1112 "Will try to create jobs that run " \
1114 "further promises...")
1121 """Set process.dqmSaver.saveByLumiSectiont=1 in cfg harvesting file
1123 self.saveByLumiSection =
True
1125 self.logger.
warning(
"waning concerning saveByLumiSection option")
1133 """Crab jobs are not created and
1134 "submitted automatically",
1136 self.crab_submission =
True
1144 self.nr_max_sites = value
1150 self.preferred_site = value
1155 """List all harvesting types and their mappings.
1157 This lists all implemented harvesting types with their
1158 corresponding mappings to sequence names. This had to be
1159 separated out from the help since it depends on the CMSSW
1160 version and was making things a bit of a mess.
1162 NOTE: There is no way (at least not that I could come up with)
1163 to code this in a neat generic way that can be read both by
1164 this method and by setup_harvesting_info(). Please try hard to
1165 keep these two methods in sync!
1170 sep_line_short =
"-" * 20
1173 print(
"The following harvesting types are available:")
1176 print(
"`RelVal' maps to:")
1177 print(
" pre-3_3_0 : HARVESTING:validationHarvesting")
1178 print(
" 3_4_0_pre2 and later: HARVESTING:validationHarvesting+dqmHarvesting")
1179 print(
" Exceptions:")
1180 print(
" 3_3_0_pre1-4 : HARVESTING:validationHarvesting")
1181 print(
" 3_3_0_pre6 : HARVESTING:validationHarvesting")
1182 print(
" 3_4_0_pre1 : HARVESTING:validationHarvesting")
1184 print(sep_line_short)
1186 print(
"`RelValFS' maps to:")
1187 print(
" always : HARVESTING:validationHarvestingFS")
1189 print(sep_line_short)
1191 print(
"`MC' maps to:")
1192 print(
" always : HARVESTING:validationprodHarvesting")
1194 print(sep_line_short)
1196 print(
"`DQMOffline' maps to:")
1197 print(
" always : HARVESTING:dqmHarvesting")
1210 """Fill our dictionary with all info needed to understand
1213 This depends on the CMSSW version since at some point the
1214 names and sequences were modified.
1216 NOTE: There is no way (at least not that I could come up with)
1217 to code this in a neat generic way that can be read both by
1218 this method and by option_handler_list_types(). Please try
1219 hard to keep these two methods in sync!
1223 assert not self.cmssw_version
is None, \
1224 "ERROR setup_harvesting() requires " \
1225 "self.cmssw_version to be set!!!"
1227 harvesting_info = {}
1230 harvesting_info[
"DQMOffline"] = {}
1231 harvesting_info[
"DQMOffline"][
"beamspot"] =
None
1232 harvesting_info[
"DQMOffline"][
"eventcontent"] =
None
1233 harvesting_info[
"DQMOffline"][
"harvesting"] =
"AtRunEnd"
1235 harvesting_info[
"RelVal"] = {}
1236 harvesting_info[
"RelVal"][
"beamspot"] =
None
1237 harvesting_info[
"RelVal"][
"eventcontent"] =
None
1238 harvesting_info[
"RelVal"][
"harvesting"] =
"AtRunEnd"
1240 harvesting_info[
"RelValFS"] = {}
1241 harvesting_info[
"RelValFS"][
"beamspot"] =
None
1242 harvesting_info[
"RelValFS"][
"eventcontent"] =
None
1243 harvesting_info[
"RelValFS"][
"harvesting"] =
"AtRunEnd"
1245 harvesting_info[
"MC"] = {}
1246 harvesting_info[
"MC"][
"beamspot"] =
None
1247 harvesting_info[
"MC"][
"eventcontent"] =
None
1248 harvesting_info[
"MC"][
"harvesting"] =
"AtRunEnd"
1258 assert self.cmssw_version.startswith(
"CMSSW_")
1261 version = self.cmssw_version[6:]
1267 if version <
"3_3_0":
1268 step_string =
"validationHarvesting"
1269 elif version
in [
"3_3_0_pre1",
"3_3_0_pre2",
1270 "3_3_0_pre3",
"3_3_0_pre4",
1271 "3_3_0_pre6",
"3_4_0_pre1"]:
1272 step_string =
"validationHarvesting"
1274 step_string =
"validationHarvesting+dqmHarvesting"
1276 harvesting_info[
"RelVal"][
"step_string"] = step_string
1280 assert not step_string
is None, \
1281 "ERROR Could not decide a RelVal harvesting sequence " \
1282 "for CMSSW version %s" % self.cmssw_version
1288 step_string =
"validationHarvestingFS"
1290 harvesting_info[
"RelValFS"][
"step_string"] = step_string
1295 step_string =
"validationprodHarvesting"
1297 harvesting_info[
"MC"][
"step_string"] = step_string
1301 assert not step_string
is None, \
1302 "ERROR Could not decide a MC harvesting " \
1303 "sequence for CMSSW version %s" % self.cmssw_version
1309 step_string =
"dqmHarvesting"
1311 harvesting_info[
"DQMOffline"][
"step_string"] = step_string
1315 self.harvesting_info = harvesting_info
1317 self.logger.
info(
"Based on the CMSSW version (%s) " \
1318 "I decided to use the `HARVESTING:%s' " \
1319 "sequence for %s harvesting" % \
1320 (self.cmssw_version,
1321 self.harvesting_info[self.harvesting_type][
"step_string"],
1322 self.harvesting_type))
1329 """Build the common part of the output path to be used on
1332 This consists of the CASTOR area base path specified by the
1333 user and a piece depending on the data type (data vs. MC), the
1334 harvesting type and the dataset name followed by a piece
1335 containing the run number and event count. (See comments in
1336 create_castor_path_name_special for details.) This method
1337 creates the common part, without run number and event count.
1341 castor_path = self.castor_base_dir
1346 datatype = self.datasets_information[dataset_name][
"datatype"]
1347 datatype = datatype.lower()
1348 castor_path = os.path.join(castor_path, datatype)
1351 harvesting_type = self.harvesting_type
1352 harvesting_type = harvesting_type.lower()
1353 castor_path = os.path.join(castor_path, harvesting_type)
1363 release_version = self.cmssw_version
1364 release_version = release_version.lower(). \
1367 castor_path = os.path.join(castor_path, release_version)
1370 dataset_name_escaped = self.escape_dataset_name(dataset_name)
1371 castor_path = os.path.join(castor_path, dataset_name_escaped)
1375 castor_path = os.path.normpath(castor_path)
1383 dataset_name, run_number,
1384 castor_path_common):
1385 """Create the specialised part of the CASTOR output dir name.
1387 NOTE: To avoid clashes with `incremental harvesting'
1388 (re-harvesting when a dataset grows) we have to include the
1389 event count in the path name. The underlying `problem' is that
1390 CRAB does not overwrite existing output files so if the output
1391 file already exists CRAB will fail to copy back the output.
1393 NOTE: It's not possible to create different kinds of
1394 harvesting jobs in a single call to this tool. However, in
1395 principle it could be possible to create both data and MC jobs
1398 NOTE: The number of events used in the path name is the
1399 _total_ number of events in the dataset/run at the time of
1400 harvesting. If we're doing partial harvesting the final
1401 results will reflect lower statistics. This is a) the easiest
1402 to code and b) the least likely to lead to confusion if
1403 someone ever decides to swap/copy around file blocks between
1408 castor_path = castor_path_common
1413 castor_path = os.path.join(castor_path,
"run_%d" % run_number)
1421 castor_path = os.path.join(castor_path,
"nevents")
1425 castor_path = os.path.normpath(castor_path)
1433 """Make sure all required CASTOR output dirs exist.
1435 This checks the CASTOR base dir specified by the user as well
1436 as all the subdirs required by the current set of jobs.
1440 self.logger.
info(
"Checking (and if necessary creating) CASTOR " \
1441 "output area(s)...")
1444 self.create_and_check_castor_dir(self.castor_base_dir)
1448 for (dataset_name, runs)
in six.iteritems(self.datasets_to_use):
1451 castor_dirs.append(self.datasets_information[dataset_name] \
1452 [
"castor_path"][run])
1453 castor_dirs_unique = sorted(set(castor_dirs))
1457 ndirs = len(castor_dirs_unique)
1458 step =
max(ndirs / 10, 1)
1459 for (i, castor_dir)
in enumerate(castor_dirs_unique):
1460 if (i + 1) % step == 0
or \
1462 self.logger.
info(
" %d/%d" % \
1464 self.create_and_check_castor_dir(castor_dir)
1471 self.logger.
debug(
"Checking if path `%s' is empty" % \
1473 cmd =
"rfdir %s" % castor_dir
1474 (status, output) = commands.getstatusoutput(cmd)
1476 msg =
"Could not access directory `%s'" \
1477 " !!! This is bad since I should have just" \
1478 " created it !!!" % castor_dir
1479 self.logger.fatal(msg)
1482 self.logger.
warning(
"Output directory `%s' is not empty:" \
1483 " new jobs will fail to" \
1484 " copy back output" % \
1492 """Check existence of the give CASTOR dir, if necessary create
1495 Some special care has to be taken with several things like
1496 setting the correct permissions such that CRAB can store the
1497 output results. Of course this means that things like
1498 /castor/cern.ch/ and user/j/ have to be recognised and treated
1501 NOTE: Only CERN CASTOR area (/castor/cern.ch/) supported for
1504 NOTE: This method uses some slightly tricky caching to make
1505 sure we don't keep over and over checking the same base paths.
1512 def split_completely(path):
1513 (parent_path, name) = os.path.split(path)
1515 return (parent_path, )
1517 return split_completely(parent_path) + (name, )
1523 def extract_permissions(rfstat_output):
1524 """Parse the output from rfstat and return the
1525 5-digit permissions string."""
1527 permissions_line = [i
for i
in output.split(
"\n") \
1528 if i.lower().
find(
"protection") > -1]
1529 regexp = re.compile(
".*\(([0123456789]{5})\).*")
1530 match = regexp.search(rfstat_output)
1531 if not match
or len(match.groups()) != 1:
1532 msg =
"Could not extract permissions " \
1533 "from output: %s" % rfstat_output
1534 self.logger.fatal(msg)
1536 permissions = match.group(1)
1553 castor_paths_dont_touch = {
1554 0: [
"/",
"castor",
"cern.ch",
"cms",
"store",
"temp",
1555 "dqm",
"offline",
"user"],
1556 -1: [
"user",
"store"]
1559 self.logger.
debug(
"Checking CASTOR path `%s'" % castor_dir)
1564 castor_path_pieces = split_completely(castor_dir)
1570 check_sizes = sorted(castor_paths_dont_touch.keys())
1571 len_castor_path_pieces = len(castor_path_pieces)
1572 for piece_index
in range (len_castor_path_pieces):
1573 skip_this_path_piece =
False
1574 piece = castor_path_pieces[piece_index]
1577 for check_size
in check_sizes:
1579 if (piece_index + check_size) > -1:
1583 if castor_path_pieces[piece_index + check_size]
in castor_paths_dont_touch[check_size]:
1585 skip_this_path_piece =
True
1593 path = os.path.join(path, piece)
1600 if path
in self.castor_path_checks_cache:
1602 except AttributeError:
1604 self.castor_path_checks_cache = []
1605 self.castor_path_checks_cache.
append(path)
1623 if not skip_this_path_piece:
1631 self.logger.
debug(
"Checking if path `%s' exists" % \
1633 cmd =
"rfstat %s" % path
1634 (status, output) = commands.getstatusoutput(cmd)
1637 self.logger.
debug(
"Creating path `%s'" % path)
1638 cmd =
"nsmkdir -m 775 %s" % path
1639 (status, output) = commands.getstatusoutput(cmd)
1641 msg =
"Could not create directory `%s'" % path
1642 self.logger.fatal(msg)
1644 cmd =
"rfstat %s" % path
1645 (status, output) = commands.getstatusoutput(cmd)
1650 permissions = extract_permissions(output)
1651 if not permissions.startswith(
"40"):
1652 msg =
"Path `%s' is not a directory(?)" % path
1653 self.logger.fatal(msg)
1658 self.logger.
debug(
"Checking permissions for path `%s'" % path)
1659 cmd =
"rfstat %s" % path
1660 (status, output) = commands.getstatusoutput(cmd)
1662 msg =
"Could not obtain permissions for directory `%s'" % \
1664 self.logger.fatal(msg)
1667 permissions = extract_permissions(output)[-3:]
1671 if piece_index == (len_castor_path_pieces - 1):
1674 permissions_target =
"775"
1677 permissions_target =
"775"
1680 permissions_new = []
1681 for (i, j)
in zip(permissions, permissions_target):
1683 permissions_new =
"".
join(permissions_new)
1684 self.logger.
debug(
" current permissions: %s" % \
1686 self.logger.
debug(
" target permissions : %s" % \
1688 if permissions_new != permissions:
1690 self.logger.
debug(
"Changing permissions of `%s' " \
1691 "to %s (were %s)" % \
1692 (path, permissions_new, permissions))
1693 cmd =
"rfchmod %s %s" % (permissions_new, path)
1694 (status, output) = commands.getstatusoutput(cmd)
1696 msg =
"Could not change permissions for path `%s' " \
1697 "to %s" % (path, permissions_new)
1698 self.logger.fatal(msg)
1701 self.logger.
debug(
" Permissions ok (%s)" % permissions_new)
1710 sites_forbidden = []
1712 if (self.preferred_site ==
"CAF")
or (self.preferred_site ==
"caf.cern.ch"):
1713 self.caf_access =
True
1715 if self.caf_access ==
False:
1716 sites_forbidden.append(
"caf.cern.ch")
1727 "cmssrm-fzk.gridka.de",
1729 "gridka-dCache.fzk.de",
1730 "srm-cms.gridpp.rl.ac.uk",
1731 "srm.grid.sinica.edu.tw",
1732 "srm2.grid.sinica.edu.tw",
1734 "storm-fe-cms.cr.cnaf.infn.it"
1738 "CAF" :
"caf.cern.ch",
1739 "CH" :
"srm-cms.cern.ch",
1740 "FR" :
"ccsrm.in2p3.fr",
1741 "DE" :
"cmssrm-fzk.gridka.de",
1742 "GOV" :
"cmssrm.fnal.gov",
1743 "DE2" :
"gridka-dCache.fzk.de",
1744 "UK" :
"srm-cms.gridpp.rl.ac.uk",
1745 "TW" :
"srm.grid.sinica.edu.tw",
1746 "TW2" :
"srm2.grid.sinica.edu.tw",
1747 "ES" :
"srmcms.pic.es",
1748 "IT" :
"storm-fe-cms.cr.cnaf.infn.it"
1751 if self.non_t1access:
1752 sites_forbidden.extend(all_t1)
1754 for site
in sites_forbidden:
1758 if self.preferred_site
in country_codes:
1759 self.preferred_site = country_codes[self.preferred_site]
1761 if self.preferred_site !=
"no preference":
1762 if self.preferred_site
in sites:
1763 sites = [self.preferred_site]
1776 while len(sites) > 0
and \
1783 t1_sites.append(site)
1784 if site ==
"caf.cern.ch":
1785 t1_sites.append(site)
1792 if len(t1_sites) > 0:
1793 se_name = choice(t1_sites)
1796 se_name = choice(sites)
1800 if se_name
in self.sites_and_versions_cache
and \
1801 cmssw_version
in self.sites_and_versions_cache[se_name]:
1802 if self.sites_and_versions_cache[se_name][cmssw_version]:
1806 self.logger.
debug(
" --> rejecting site `%s'" % se_name)
1807 sites.remove(se_name)
1810 self.logger.
info(
"Checking if site `%s' " \
1811 "has CMSSW version `%s'" % \
1812 (se_name, cmssw_version))
1813 self.sites_and_versions_cache[se_name] = {}
1828 cmd =
"lcg-info --list-ce " \
1831 "CEStatus=Production," \
1833 (cmssw_version, se_name)
1834 (status, output) = commands.getstatusoutput(cmd)
1836 self.logger.
error(
"Could not check site information " \
1837 "for site `%s'" % se_name)
1839 if (len(output) > 0)
or (se_name ==
"caf.cern.ch"):
1840 self.sites_and_versions_cache[se_name][cmssw_version] =
True
1844 self.sites_and_versions_cache[se_name][cmssw_version] =
False
1845 self.logger.
debug(
" --> rejecting site `%s'" % se_name)
1846 sites.remove(se_name)
1848 if site_name
is self.no_matching_site_found_str:
1849 self.logger.
error(
" --> no matching site found")
1850 self.logger.
error(
" --> Your release or SCRAM " \
1851 "architecture may not be available" \
1852 "anywhere on the (LCG) grid.")
1854 self.logger.
debug(
" (command used: `%s')" % cmd)
1856 self.logger.
debug(
" --> selected site `%s'" % site_name)
1860 if site_name
is None:
1861 site_name = self.no_matching_site_found_str
1864 self.all_sites_found =
False
1876 parser = optparse.OptionParser(version=
"%s %s" % \
1877 (
"%prog", self.version),
1880 self.option_parser = parser
1883 parser.add_option(
"-d",
"--debug",
1884 help=
"Switch on debug mode",
1886 callback=self.option_handler_debug)
1889 parser.add_option(
"-q",
"--quiet",
1890 help=
"Be less verbose",
1892 callback=self.option_handler_quiet)
1896 parser.add_option(
"",
"--force",
1897 help=
"Force mode. Do not abort on sanity check "
1900 callback=self.option_handler_force)
1903 parser.add_option(
"",
"--harvesting_type",
1904 help=
"Harvesting type: %s" % \
1905 ", ".
join(self.harvesting_types),
1907 callback=self.option_handler_harvesting_type,
1909 metavar=
"HARVESTING_TYPE")
1912 parser.add_option(
"",
"--harvesting_mode",
1913 help=
"Harvesting mode: %s (default = %s)" % \
1914 (
", ".
join(self.harvesting_modes),
1915 self.harvesting_mode_default),
1917 callback=self.option_handler_harvesting_mode,
1919 metavar=
"HARVESTING_MODE")
1922 parser.add_option(
"",
"--globaltag",
1923 help=
"GlobalTag to use. Default is the ones " \
1924 "the dataset was created with for MC, for data" \
1925 "a GlobalTag has to be specified.",
1927 callback=self.option_handler_globaltag,
1929 metavar=
"GLOBALTAG")
1932 parser.add_option(
"",
"--no-ref-hists",
1933 help=
"Don't use any reference histograms",
1935 callback=self.option_handler_no_ref_hists)
1939 parser.add_option(
"",
"--frontier-connection",
1940 help=
"Use this Frontier connection to find " \
1941 "GlobalTags and LocalTags (for reference " \
1942 "histograms).\nPlease only use this for " \
1945 callback=self.option_handler_frontier_connection,
1951 parser.add_option(
"",
"--frontier-connection-for-globaltag",
1952 help=
"Use this Frontier connection to find " \
1953 "GlobalTags.\nPlease only use this for " \
1956 callback=self.option_handler_frontier_connection,
1962 parser.add_option(
"",
"--frontier-connection-for-refhists",
1963 help=
"Use this Frontier connection to find " \
1964 "LocalTags (for reference " \
1965 "histograms).\nPlease only use this for " \
1968 callback=self.option_handler_frontier_connection,
1974 parser.add_option(
"",
"--dataset",
1975 help=
"Name (or regexp) of dataset(s) to process",
1978 callback=self.option_handler_input_spec,
1985 parser.add_option(
"",
"--dataset-ignore",
1986 help=
"Name (or regexp) of dataset(s) to ignore",
1988 callback=self.option_handler_input_spec,
1990 metavar=
"DATASET-IGNORE")
1994 parser.add_option(
"",
"--runs",
1995 help=
"Run number(s) to process",
1997 callback=self.option_handler_input_spec,
2003 parser.add_option(
"",
"--runs-ignore",
2004 help=
"Run number(s) to ignore",
2006 callback=self.option_handler_input_spec,
2008 metavar=
"RUNS-IGNORE")
2012 parser.add_option(
"",
"--datasetfile",
2013 help=
"File containing list of dataset names " \
2014 "(or regexps) to process",
2017 callback=self.option_handler_input_spec,
2020 metavar=
"DATASETFILE")
2024 parser.add_option(
"",
"--datasetfile-ignore",
2025 help=
"File containing list of dataset names " \
2026 "(or regexps) to ignore",
2028 callback=self.option_handler_input_spec,
2030 metavar=
"DATASETFILE-IGNORE")
2034 parser.add_option(
"",
"--runslistfile",
2035 help=
"File containing list of run numbers " \
2038 callback=self.option_handler_input_spec,
2040 metavar=
"RUNSLISTFILE")
2044 parser.add_option(
"",
"--runslistfile-ignore",
2045 help=
"File containing list of run numbers " \
2048 callback=self.option_handler_input_spec,
2050 metavar=
"RUNSLISTFILE-IGNORE")
2054 parser.add_option(
"",
"--Jsonrunfile",
2055 help=
"Jsonfile containing dictionary of run/lumisections pairs. " \
2056 "All lumisections of runs contained in dictionary are processed.",
2058 callback=self.option_handler_input_Jsonrunfile,
2060 metavar=
"JSONRUNFILE")
2064 parser.add_option(
"",
"--Jsonfile",
2065 help=
"Jsonfile containing dictionary of run/lumisections pairs. " \
2066 "Only specified lumisections of runs contained in dictionary are processed.",
2068 callback=self.option_handler_input_Jsonfile,
2074 parser.add_option(
"",
"--todo-file",
2075 help=
"Todo file containing a list of runs to process.",
2077 callback=self.option_handler_input_todofile,
2079 metavar=
"TODO-FILE")
2083 parser.add_option(
"",
"--refhistmappingfile",
2084 help=
"File to be use for the reference " \
2085 "histogram mappings. Default: `%s'." % \
2086 self.ref_hist_mappings_file_name_default,
2088 callback=self.option_handler_ref_hist_mapping_file,
2090 metavar=
"REFHISTMAPPING-FILE")
2095 parser.add_option(
"",
"--castordir",
2096 help=
"Place on CASTOR to store results. " \
2097 "Default: `%s'." % \
2098 self.castor_base_dir_default,
2100 callback=self.option_handler_castor_dir,
2102 metavar=
"CASTORDIR")
2106 parser.add_option(
"",
"--no-t1access",
2107 help=
"Try to create jobs that will run " \
2108 "without special `t1access' role",
2110 callback=self.option_handler_no_t1access)
2113 parser.add_option(
"",
"--caf-access",
2114 help=
"Crab jobs may run " \
2117 callback=self.option_handler_caf_access)
2120 parser.add_option(
"",
"--saveByLumiSection",
2121 help=
"set saveByLumiSection=1 in harvesting cfg file",
2123 callback=self.option_handler_saveByLumiSection)
2126 parser.add_option(
"",
"--automatic-crab-submission",
2127 help=
"Crab jobs are created and " \
2128 "submitted automatically",
2130 callback=self.option_handler_crab_submission)
2134 parser.add_option(
"",
"--max-sites",
2135 help=
"Max. number of sites each job is submitted to",
2137 callback=self.option_handler_sites,
2141 parser.add_option(
"",
"--site",
2142 help=
"Crab jobs are submitted to specified site. T1 sites may be shortened by the following (country) codes: \
2143 srm-cms.cern.ch : CH \
2144 ccsrm.in2p3.fr : FR \
2145 cmssrm-fzk.gridka.de : DE \
2146 cmssrm.fnal.gov : GOV \
2147 gridka-dCache.fzk.de : DE2 \
2148 rm-cms.gridpp.rl.ac.uk : UK \
2149 srm.grid.sinica.edu.tw : TW \
2150 srm2.grid.sinica.edu.tw : TW2 \
2151 srmcms.pic.es : ES \
2152 storm-fe-cms.cr.cnaf.infn.it : IT",
2154 callback=self.option_handler_preferred_site,
2159 parser.add_option(
"-l",
"--list",
2160 help=
"List all harvesting types and their" \
2161 "corresponding sequence names",
2163 callback=self.option_handler_list_types)
2169 if len(self.cmd_line_opts) < 1:
2170 self.cmd_line_opts = [
"--help"]
2179 for i
in [
"-d",
"--debug",
2181 if i
in self.cmd_line_opts:
2182 self.cmd_line_opts.
remove(i)
2183 self.cmd_line_opts.
insert(0, i)
2186 parser.set_defaults()
2187 (self.options, self.args) = parser.parse_args(self.cmd_line_opts)
2194 """Check completeness and correctness of input information.
2196 Check that all required information has been specified and
2197 that, at least as far as can be easily checked, it makes
2200 NOTE: This is also where any default values are applied.
2204 self.logger.
info(
"Checking completeness/correctness of input...")
2208 if len(self.args) > 0:
2209 msg =
"Sorry but I don't understand `%s'" % \
2210 (
" ".
join(self.args))
2211 self.logger.fatal(msg)
2217 if self.harvesting_mode ==
"two-step":
2218 msg =
"--------------------\n" \
2219 " Sorry, but for the moment (well, till it works)" \
2220 " the two-step mode has been disabled.\n" \
2221 "--------------------\n"
2222 self.logger.fatal(msg)
2227 if self.harvesting_type
is None:
2228 msg =
"Please specify a harvesting type"
2229 self.logger.fatal(msg)
2232 if self.harvesting_mode
is None:
2233 self.harvesting_mode = self.harvesting_mode_default
2234 msg =
"No harvesting mode specified --> using default `%s'" % \
2235 self.harvesting_mode
2242 if self.input_method[
"datasets"][
"use"]
is None:
2243 msg =
"Please specify an input dataset name " \
2244 "or a list file name"
2245 self.logger.fatal(msg)
2250 assert not self.input_name[
"datasets"][
"use"]
is None
2257 if self.use_ref_hists:
2258 if self.ref_hist_mappings_file_name
is None:
2259 self.ref_hist_mappings_file_name = self.ref_hist_mappings_file_name_default
2260 msg =
"No reference histogram mapping file specified --> " \
2261 "using default `%s'" % \
2262 self.ref_hist_mappings_file_name
2269 if self.castor_base_dir
is None:
2270 self.castor_base_dir = self.castor_base_dir_default
2271 msg =
"No CASTOR area specified -> using default `%s'" % \
2272 self.castor_base_dir
2277 if not self.castor_base_dir.startswith(self.castor_prefix):
2278 msg =
"CASTOR area does not start with `%s'" % \
2280 self.logger.fatal(msg)
2281 if self.castor_base_dir.
find(
"castor") > -1
and \
2282 not self.castor_base_dir.
find(
"cern.ch") > -1:
2283 self.logger.fatal(
"Only CERN CASTOR is supported")
2294 if self.globaltag
is None:
2295 self.logger.
warning(
"No GlobalTag specified. This means I cannot")
2296 self.logger.
warning(
"run on data, only on MC.")
2297 self.logger.
warning(
"I will skip all data datasets.")
2302 if not self.globaltag
is None:
2303 if not self.globaltag.endswith(
"::All"):
2304 self.logger.
warning(
"Specified GlobalTag `%s' does " \
2305 "not end in `::All' --> " \
2306 "appending this missing piece" % \
2308 self.globaltag =
"%s::All" % self.globaltag
2313 for (key, value)
in six.iteritems(self.frontier_connection_name):
2314 frontier_type_str =
"unknown"
2315 if key ==
"globaltag":
2316 frontier_type_str =
"the GlobalTag"
2317 elif key ==
"refhists":
2318 frontier_type_str =
"the reference histograms"
2320 if self.frontier_connection_overridden[key] ==
True:
2324 self.logger.
info(
"Using %sdefault Frontier " \
2325 "connection for %s: `%s'" % \
2326 (non_str, frontier_type_str, value))
2335 """Check if CMSSW is setup.
2342 cmssw_version = os.getenv(
"CMSSW_VERSION")
2343 if cmssw_version
is None:
2344 self.logger.fatal(
"It seems CMSSW is not setup...")
2345 self.logger.fatal(
"($CMSSW_VERSION is empty)")
2346 raise Error(
"ERROR: CMSSW needs to be setup first!")
2348 self.cmssw_version = cmssw_version
2349 self.logger.
info(
"Found CMSSW version %s properly set up" % \
2358 """Check if DBS is setup.
2365 dbs_home = os.getenv(
"DBSCMD_HOME")
2366 if dbs_home
is None:
2367 self.logger.fatal(
"It seems DBS is not setup...")
2368 self.logger.fatal(
" $DBSCMD_HOME is empty")
2369 raise Error(
"ERROR: DBS needs to be setup first!")
2387 self.logger.
debug(
"Found DBS properly set up")
2395 """Setup the Python side of DBS.
2397 For more information see the DBS Python API documentation:
2398 https://twiki.cern.ch/twiki/bin/view/CMS/DBSApiDocumentation
2404 args[
"url"]=
"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/" \
2405 "servlet/DBSServlet"
2409 except DBSAPI.dbsApiException.DbsApiException
as ex:
2410 self.logger.fatal(
"Caught DBS API exception %s: %s " % \
2411 (ex.getClassName(), ex.getErrorMessage()))
2412 if ex.getErrorCode()
not in (
None,
""):
2413 logger.debug(
"DBS exception error code: ", ex.getErrorCode())
2421 """Use DBS to resolve a wildcarded dataset name.
2427 assert not self.dbs_api
is None
2432 if not (dataset_name.startswith(
"/")
and \
2433 dataset_name.endswith(
"RECO")):
2434 self.logger.
warning(
"Dataset name `%s' does not sound " \
2435 "like a valid dataset name!" % \
2441 dbs_query =
"find dataset where dataset like %s " \
2442 "and dataset.status = VALID" % \
2445 api_result = api.executeQuery(dbs_query)
2446 except DBSAPI.dbsApiException.DbsApiException:
2447 msg =
"ERROR: Could not execute DBS query"
2448 self.logger.fatal(msg)
2453 parser = xml.sax.make_parser()
2454 parser.setContentHandler(handler)
2458 xml.sax.parseString(api_result, handler)
2459 except SAXParseException:
2460 msg =
"ERROR: Could not parse DBS server output"
2461 self.logger.fatal(msg)
2465 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!"
2469 datasets = handler.results.values()[0]
2477 """Ask DBS for the CMSSW version used to create this dataset.
2483 assert not self.dbs_api
is None
2487 dbs_query =
"find algo.version where dataset = %s " \
2488 "and dataset.status = VALID" % \
2491 api_result = api.executeQuery(dbs_query)
2492 except DBSAPI.dbsApiException.DbsApiException:
2493 msg =
"ERROR: Could not execute DBS query"
2494 self.logger.fatal(msg)
2498 parser = xml.sax.make_parser()
2499 parser.setContentHandler(handler)
2502 xml.sax.parseString(api_result, handler)
2503 except SAXParseException:
2504 msg =
"ERROR: Could not parse DBS server output"
2505 self.logger.fatal(msg)
2509 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!"
2512 cmssw_version = handler.results.values()[0]
2515 assert len(cmssw_version) == 1
2518 cmssw_version = cmssw_version[0]
2521 return cmssw_version
2571 """Ask DBS for the list of runs in a given dataset.
2573 # NOTE: This does not (yet?) skip/remove empty runs. There is
2574 # a bug in the DBS entry run.numevents (i.e. it always returns
2575 # zero) which should be fixed in the `next DBS release'.
2577 # https://savannah.cern.ch/bugs/?53452
2578 # https://savannah.cern.ch/bugs/?53711
2589 assert not self.dbs_api
is None
2593 dbs_query =
"find run where dataset = %s " \
2594 "and dataset.status = VALID" % \
2597 api_result = api.executeQuery(dbs_query)
2598 except DBSAPI.dbsApiException.DbsApiException:
2599 msg =
"ERROR: Could not execute DBS query"
2600 self.logger.fatal(msg)
2604 parser = xml.sax.make_parser()
2605 parser.setContentHandler(handler)
2608 xml.sax.parseString(api_result, handler)
2609 except SAXParseException:
2610 msg =
"ERROR: Could not parse DBS server output"
2611 self.logger.fatal(msg)
2615 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!"
2618 runs = handler.results.values()[0]
2620 runs = sorted([
int(i)
for i
in runs])
2628 """Ask DBS for the globaltag corresponding to a given dataset.
2631 # This does not seem to work for data datasets? E.g. for
2632 # /Cosmics/Commissioning08_CRAFT0831X_V1_311_ReReco_FromSuperPointing_v1/RAW-RECO
2633 # Probaly due to the fact that the GlobalTag changed during
2641 assert not self.dbs_api
is None
2645 dbs_query =
"find dataset.tag where dataset = %s " \
2646 "and dataset.status = VALID" % \
2649 api_result = api.executeQuery(dbs_query)
2650 except DBSAPI.dbsApiException.DbsApiException:
2651 msg =
"ERROR: Could not execute DBS query"
2652 self.logger.fatal(msg)
2656 parser = xml.sax.make_parser()
2657 parser.setContentHandler(parser)
2660 xml.sax.parseString(api_result, handler)
2661 except SAXParseException:
2662 msg =
"ERROR: Could not parse DBS server output"
2663 self.logger.fatal(msg)
2667 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!"
2670 globaltag = handler.results.values()[0]
2673 assert len(globaltag) == 1
2676 globaltag = globaltag[0]
2684 """Ask DBS for the the data type (data or mc) of a given
2691 assert not self.dbs_api
is None
2695 dbs_query =
"find datatype.type where dataset = %s " \
2696 "and dataset.status = VALID" % \
2699 api_result = api.executeQuery(dbs_query)
2700 except DBSAPI.dbsApiException.DbsApiException:
2701 msg =
"ERROR: Could not execute DBS query"
2702 self.logger.fatal(msg)
2706 parser = xml.sax.make_parser()
2707 parser.setContentHandler(handler)
2710 xml.sax.parseString(api_result, handler)
2711 except SAXParseException:
2712 msg =
"ERROR: Could not parse DBS server output"
2713 self.logger.fatal(msg)
2717 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!"
2720 datatype = handler.results.values()[0]
2723 assert len(datatype) == 1
2726 datatype = datatype[0]
2737 """Determine the number of events in a given dataset (and run).
2739 Ask DBS for the number of events in a dataset. If a run number
2740 is specified the number of events returned is that in that run
2741 of that dataset. If problems occur we throw an exception.
2744 # Since DBS does not return the number of events correctly,
2745 # neither for runs nor for whole datasets, we have to work
2746 # around that a bit...
2753 assert not self.dbs_api
is None
2757 dbs_query =
"find file.name, file.numevents where dataset = %s " \
2758 "and dataset.status = VALID" % \
2760 if not run_number
is None:
2761 dbs_query = dbq_query + (
" and run = %d" % run_number)
2763 api_result = api.executeQuery(dbs_query)
2764 except DBSAPI.dbsApiException.DbsApiException:
2765 msg =
"ERROR: Could not execute DBS query"
2766 self.logger.fatal(msg)
2770 parser = xml.sax.make_parser()
2771 parser.setContentHandler(handler)
2774 xml.sax.parseString(api_result, handler)
2775 except SAXParseException:
2776 msg =
"ERROR: Could not parse DBS server output"
2777 self.logger.fatal(msg)
2781 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!"
2784 num_events = sum(handler.results[
"file.numevents"])
3078 """Figure out the number of events in each run of this dataset.
3080 This is a more efficient way of doing this than calling
3081 dbs_resolve_number_of_events for each run.
3085 self.logger.
debug(
"Checking spread of dataset `%s'" % dataset_name)
3089 assert not self.dbs_api
is None
3093 dbs_query =
"find run.number, site, file.name, file.numevents " \
3094 "where dataset = %s " \
3095 "and dataset.status = VALID" % \
3098 api_result = api.executeQuery(dbs_query)
3099 except DBSAPI.dbsApiException.DbsApiException:
3100 msg =
"ERROR: Could not execute DBS query"
3101 self.logger.fatal(msg)
3104 handler =
DBSXMLHandler([
"run.number",
"site",
"file.name",
"file.numevents"])
3105 parser = xml.sax.make_parser()
3106 parser.setContentHandler(handler)
3149 xml.sax.parseString(api_result, handler)
3150 except SAXParseException:
3151 msg =
"ERROR: Could not parse DBS server output"
3152 self.logger.fatal(msg)
3156 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!"
3163 for (index, site_name)
in enumerate(handler.results[
"site"]):
3173 if len(site_name) < 1:
3175 run_number =
int(handler.results[
"run.number"][index])
3176 file_name = handler.results[
"file.name"][index]
3177 nevents =
int(handler.results[
"file.numevents"][index])
3180 if run_number
not in files_info:
3182 files_info[run_number] = {}
3183 files_info[run_number][file_name] = (nevents,
3185 elif file_name
not in files_info[run_number]:
3187 files_info[run_number][file_name] = (nevents,
3194 assert nevents == files_info[run_number][file_name][0]
3196 files_info[run_number][file_name][1].
append(site_name)
3201 for run_number
in files_info.keys():
3202 files_without_sites = [i
for (i, j)
in \
3203 files_info[run_number].
items() \
3205 if len(files_without_sites) > 0:
3206 self.logger.
warning(
"Removing %d file(s)" \
3207 " with empty site names" % \
3208 len(files_without_sites))
3209 for file_name
in files_without_sites:
3210 del files_info[run_number][file_name]
3216 num_events_catalog = {}
3217 for run_number
in files_info.keys():
3218 site_names =
list(set([j
for i
in files_info[run_number].
values()
for j
in i[1]]))
3224 if len(site_names) > 1:
3232 all_file_names = files_info[run_number].
keys()
3233 all_file_names = set(all_file_names)
3234 sites_with_complete_copies = []
3235 for site_name
in site_names:
3236 files_at_site = [i
for (i, (j, k)) \
3237 in files_info[run_number].
items() \
3239 files_at_site = set(files_at_site)
3240 if files_at_site == all_file_names:
3241 sites_with_complete_copies.append(site_name)
3242 if len(sites_with_complete_copies) < 1:
3248 if len(sites_with_complete_copies) > 1:
3267 self.logger.
debug(
" -> run appears to be `mirrored'")
3269 self.logger.
debug(
" -> run appears to be spread-out")
3272 len(sites_with_complete_copies) != len(site_names):
3276 for (file_name, (i, sites))
in files_info[run_number].
items():
3277 complete_sites = [site
for site
in sites \
3278 if site
in sites_with_complete_copies]
3279 files_info[run_number][file_name] = (i, complete_sites)
3280 site_names = sites_with_complete_copies
3282 self.logger.
debug(
" for run #%d:" % run_number)
3283 num_events_catalog[run_number] = {}
3284 num_events_catalog[run_number][
"all_sites"] = sum([i[0]
for i
in files_info[run_number].
values()])
3285 if len(site_names) < 1:
3286 self.logger.
debug(
" run is not available at any site")
3287 self.logger.
debug(
" (but should contain %d events" % \
3288 num_events_catalog[run_number][
"all_sites"])
3290 self.logger.
debug(
" at all sites combined there are %d events" % \
3291 num_events_catalog[run_number][
"all_sites"])
3292 for site_name
in site_names:
3293 num_events_catalog[run_number][site_name] = sum([i[0]
for i
in files_info[run_number].
values()
if site_name
in i[1]])
3294 self.logger.
debug(
" at site `%s' there are %d events" % \
3295 (site_name, num_events_catalog[run_number][site_name]))
3296 num_events_catalog[run_number][
"mirrored"] = mirrored
3299 return num_events_catalog
3359 """Build a list of all datasets to be processed.
3368 if input_method
is None:
3370 elif input_method ==
"dataset":
3375 self.logger.
info(
"Asking DBS for dataset names")
3376 dataset_names = self.dbs_resolve_dataset_name(input_name)
3377 elif input_method ==
"datasetfile":
3382 self.logger.
info(
"Reading input from list file `%s'" % \
3385 listfile = open(
"/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/bin/%s" %input_name,
"r")
3386 print(
"open listfile")
3387 for dataset
in listfile:
3389 dataset_stripped = dataset.strip()
3390 if len(dataset_stripped) < 1:
3393 if dataset_stripped[0] !=
"#":
3394 dataset_names.extend(self. \
3398 msg =
"ERROR: Could not open input list file `%s'" % \
3400 self.logger.fatal(msg)
3405 assert False,
"Unknown input method `%s'" % input_method
3413 dataset_names = sorted(set(dataset_names))
3417 return dataset_names
3422 """Build a list of datasets to process.
3426 self.logger.
info(
"Building list of datasets to consider...")
3428 input_method = self.input_method[
"datasets"][
"use"]
3429 input_name = self.input_name[
"datasets"][
"use"]
3430 dataset_names = self.build_dataset_list(input_method,
3432 self.datasets_to_use = dict(
list(
zip(dataset_names,
3433 [
None] * len(dataset_names))))
3435 self.logger.
info(
" found %d dataset(s) to process:" % \
3437 for dataset
in dataset_names:
3438 self.logger.
info(
" `%s'" % dataset)
3445 """Build a list of datasets to ignore.
3447 NOTE: We should always have a list of datasets to process, but
3448 it may be that we don't have a list of datasets to ignore.
3452 self.logger.
info(
"Building list of datasets to ignore...")
3454 input_method = self.input_method[
"datasets"][
"ignore"]
3455 input_name = self.input_name[
"datasets"][
"ignore"]
3456 dataset_names = self.build_dataset_list(input_method,
3458 self.datasets_to_ignore = dict(
list(
zip(dataset_names,
3459 [
None] * len(dataset_names))))
3461 self.logger.
info(
" found %d dataset(s) to ignore:" % \
3463 for dataset
in dataset_names:
3464 self.logger.
info(
" `%s'" % dataset)
3476 if input_method
is None:
3478 elif input_method ==
"runs":
3481 self.logger.
info(
"Reading list of runs from the " \
3483 runs.extend([
int(i.strip()) \
3484 for i
in input_name.split(
",") \
3485 if len(i.strip()) > 0])
3486 elif input_method ==
"runslistfile":
3488 self.logger.
info(
"Reading list of runs from file `%s'" % \
3491 listfile = open(input_name,
"r")
3492 for run
in listfile:
3494 run_stripped = run.strip()
3495 if len(run_stripped) < 1:
3498 if run_stripped[0] !=
"#":
3499 runs.append(
int(run_stripped))
3502 msg =
"ERROR: Could not open input list file `%s'" % \
3504 self.logger.fatal(msg)
3510 assert False,
"Unknown input method `%s'" % input_method
3514 runs =
list(set(runs))
3522 """Build a list of runs to process.
3526 self.logger.
info(
"Building list of runs to consider...")
3528 input_method = self.input_method[
"runs"][
"use"]
3529 input_name = self.input_name[
"runs"][
"use"]
3530 runs = self.build_runs_list(input_method, input_name)
3531 self.runs_to_use = dict(
list(
zip(runs, [
None] * len(runs))))
3533 self.logger.
info(
" found %d run(s) to process:" % \
3536 self.logger.
info(
" %s" %
", ".
join([
str(i)
for i
in runs]))
3543 """Build a list of runs to ignore.
3545 NOTE: We should always have a list of runs to process, but
3546 it may be that we don't have a list of runs to ignore.
3550 self.logger.
info(
"Building list of runs to ignore...")
3552 input_method = self.input_method[
"runs"][
"ignore"]
3553 input_name = self.input_name[
"runs"][
"ignore"]
3554 runs = self.build_runs_list(input_method, input_name)
3555 self.runs_to_ignore = dict(
list(
zip(runs, [
None] * len(runs))))
3557 self.logger.
info(
" found %d run(s) to ignore:" % \
3560 self.logger.
info(
" %s" %
", ".
join([
str(i)
for i
in runs]))
3567 """Update the list of datasets taking into account the ones to
3570 Both lists have been generated before from DBS and both are
3571 assumed to be unique.
3573 NOTE: The advantage of creating the ignore list from DBS (in
3574 case a regexp is given) and matching that instead of directly
3575 matching the ignore criterion against the list of datasets (to
3576 consider) built from DBS is that in the former case we're sure
3577 that all regexps are treated exactly as DBS would have done
3578 without the cmsHarvester.
3580 NOTE: This only removes complete samples. Exclusion of single
3581 runs is done by the book keeping. So the assumption is that a
3582 user never wants to harvest just part (i.e. n out of N runs)
3587 self.logger.
info(
"Processing list of datasets to ignore...")
3589 self.logger.
debug(
"Before processing ignore list there are %d " \
3590 "datasets in the list to be processed" % \
3591 len(self.datasets_to_use))
3594 dataset_names_filtered = copy.deepcopy(self.datasets_to_use)
3595 for dataset_name
in self.datasets_to_use.
keys():
3596 if dataset_name
in self.datasets_to_ignore.
keys():
3597 del dataset_names_filtered[dataset_name]
3599 self.logger.
info(
" --> Removed %d dataset(s)" % \
3600 (len(self.datasets_to_use) -
3601 len(dataset_names_filtered)))
3603 self.datasets_to_use = dataset_names_filtered
3605 self.logger.
debug(
"After processing ignore list there are %d " \
3606 "datasets in the list to be processed" % \
3607 len(self.datasets_to_use))
3615 self.logger.
info(
"Processing list of runs to use and ignore...")
3626 runs_to_use = self.runs_to_use
3627 runs_to_ignore = self.runs_to_ignore
3629 for dataset_name
in self.datasets_to_use:
3630 runs_in_dataset = self.datasets_information[dataset_name][
"runs"]
3633 runs_to_use_tmp = []
3634 for run
in runs_to_use:
3635 if not run
in runs_in_dataset:
3636 self.logger.
warning(
"Dataset `%s' does not contain " \
3637 "requested run %d " \
3638 "--> ignoring `use' of this run" % \
3639 (dataset_name, run))
3641 runs_to_use_tmp.append(run)
3643 if len(runs_to_use) > 0:
3644 runs = runs_to_use_tmp
3645 self.logger.
info(
"Using %d out of %d runs " \
3646 "of dataset `%s'" % \
3647 (len(runs), len(runs_in_dataset),
3650 runs = runs_in_dataset
3652 if len(runs_to_ignore) > 0:
3655 if not run
in runs_to_ignore:
3656 runs_tmp.append(run)
3657 self.logger.
info(
"Ignoring %d out of %d runs " \
3658 "of dataset `%s'" % \
3659 (len(runs)- len(runs_tmp),
3660 len(runs_in_dataset),
3664 if self.todofile !=
"YourToDofile.txt":
3666 print(
"Reading runs from file /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s" %self.todofile)
3667 cmd=
"grep %s /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s | cut -f5 -d' '" %(dataset_name,self.todofile)
3668 (status, output)=commands.getstatusoutput(cmd)
3671 if run_str
in output:
3672 runs_todo.append(run)
3673 self.logger.
info(
"Using %d runs " \
3674 "of dataset `%s'" % \
3680 if self.Jsonfilename !=
"YourJSON.txt":
3682 self.Jsonlumi =
True
3685 self.logger.
info(
"Reading runs and lumisections from file `%s'" % \
3688 Jsonfile = open(self.Jsonfilename,
"r")
3689 for names
in Jsonfile:
3690 dictNames= eval(
str(names))
3691 for key
in dictNames:
3693 Json_runs.append(intkey)
3696 msg =
"ERROR: Could not open Jsonfile `%s'" % \
3698 self.logger.fatal(msg)
3701 if run
in Json_runs:
3702 good_runs.append(run)
3703 self.logger.
info(
"Using %d runs " \
3704 "of dataset `%s'" % \
3708 if (self.Jsonrunfilename !=
"YourJSON.txt")
and (self.Jsonfilename ==
"YourJSON.txt"):
3712 self.logger.
info(
"Reading runs from file `%s'" % \
3713 self.Jsonrunfilename)
3715 Jsonfile = open(self.Jsonrunfilename,
"r")
3716 for names
in Jsonfile:
3717 dictNames= eval(
str(names))
3718 for key
in dictNames:
3720 Json_runs.append(intkey)
3723 msg =
"ERROR: Could not open Jsonfile `%s'" % \
3725 self.logger.fatal(msg)
3728 if run
in Json_runs:
3729 good_runs.append(run)
3730 self.logger.
info(
"Using %d runs " \
3731 "of dataset `%s'" % \
3736 self.datasets_to_use[dataset_name] = runs
3743 """Remove all but the largest part of all datasets.
3745 This allows us to harvest at least part of these datasets
3746 using single-step harvesting until the two-step approach
3752 assert self.harvesting_mode ==
"single-step-allow-partial"
3755 for dataset_name
in self.datasets_to_use:
3756 for run_number
in self.datasets_information[dataset_name][
"runs"]:
3757 max_events =
max(self.datasets_information[dataset_name][
"sites"][run_number].
values())
3758 sites_with_max_events = [i[0]
for i
in self.datasets_information[dataset_name][
"sites"][run_number].
items()
if i[1] == max_events]
3759 self.logger.
warning(
"Singlifying dataset `%s', " \
3761 (dataset_name, run_number))
3762 cmssw_version = self.datasets_information[dataset_name] \
3764 selected_site = self.pick_a_site(sites_with_max_events,
3768 nevents_old = self.datasets_information[dataset_name][
"num_events"][run_number]
3770 "only harvesting partial statistics: " \
3771 "%d out of %d events (5.1%f%%) " \
3775 100. * max_events / nevents_old,
3777 self.logger.
warning(
"!!! Please note that the number of " \
3778 "events in the output path name will " \
3779 "NOT reflect the actual statistics in " \
3780 "the harvested results !!!")
3787 self.datasets_information[dataset_name][
"sites"][run_number] = {selected_site: max_events}
3788 self.datasets_information[dataset_name][
"num_events"][run_number] = max_events
3796 """Check list of dataset names for impossible ones.
3798 Two kinds of checks are done:
3799 - Checks for things that do not make sense. These lead to
3800 errors and skipped datasets.
3801 - Sanity checks. For these warnings are issued but the user is
3802 considered to be the authoritative expert.
3805 - The CMSSW version encoded in the dataset name should match
3806 self.cmssw_version. This is critical.
3807 - There should be some events in the dataset/run. This is
3808 critical in the sense that CRAB refuses to create jobs for
3809 zero events. And yes, this does happen in practice. E.g. the
3810 reprocessed CRAFT08 datasets contain runs with zero events.
3811 - A cursory check is performed to see if the harvesting type
3812 makes sense for the data type. This should prevent the user
3813 from inadvertently running RelVal for data.
3814 - It is not possible to run single-step harvesting jobs on
3815 samples that are not fully contained at a single site.
3816 - Each dataset/run has to be available at at least one site.
3820 self.logger.
info(
"Performing sanity checks on dataset list...")
3822 dataset_names_after_checks = copy.deepcopy(self.datasets_to_use)
3824 for dataset_name
in self.datasets_to_use.
keys():
3827 version_from_dataset = self.datasets_information[dataset_name] \
3829 if version_from_dataset != self.cmssw_version:
3830 msg =
" CMSSW version mismatch for dataset `%s' " \
3833 self.cmssw_version, version_from_dataset)
3834 if self.force_running:
3837 "--> `force mode' active: " \
3840 del dataset_names_after_checks[dataset_name]
3842 "--> skipping" % msg)
3853 datatype = self.datasets_information[dataset_name][
"datatype"]
3854 if datatype ==
"data":
3856 if self.harvesting_type !=
"DQMOffline":
3858 elif datatype ==
"mc":
3859 if self.harvesting_type ==
"DQMOffline":
3863 assert False,
"ERROR Impossible data type `%s' " \
3864 "for dataset `%s'" % \
3865 (datatype, dataset_name)
3867 msg =
" Normally one does not run `%s' harvesting " \
3868 "on %s samples, are you sure?" % \
3869 (self.harvesting_type, datatype)
3870 if self.force_running:
3872 "--> `force mode' active: " \
3875 del dataset_names_after_checks[dataset_name]
3877 "--> skipping" % msg)
3891 if datatype ==
"data":
3892 if self.globaltag
is None:
3893 msg =
"For data datasets (like `%s') " \
3894 "we need a GlobalTag" % \
3896 del dataset_names_after_checks[dataset_name]
3898 "--> skipping" % msg)
3908 globaltag = self.datasets_information[dataset_name][
"globaltag"]
3909 if not globaltag
in self.globaltag_check_cache:
3910 if self.check_globaltag(globaltag):
3911 self.globaltag_check_cache.
append(globaltag)
3913 msg =
"Something is wrong with GlobalTag `%s' " \
3914 "used by dataset `%s'!" % \
3915 (globaltag, dataset_name)
3916 if self.use_ref_hists:
3917 msg +=
"\n(Either it does not exist or it " \
3918 "does not contain the required key to " \
3919 "be used with reference histograms.)"
3921 msg +=
"\n(It probably just does not exist.)"
3922 self.logger.fatal(msg)
3928 runs_without_sites = [i
for (i, j)
in \
3929 self.datasets_information[dataset_name] \
3932 i
in self.datasets_to_use[dataset_name]]
3933 if len(runs_without_sites) > 0:
3934 for run_without_sites
in runs_without_sites:
3936 dataset_names_after_checks[dataset_name].
remove(run_without_sites)
3939 self.logger.
warning(
" removed %d unavailable run(s) " \
3940 "from dataset `%s'" % \
3941 (len(runs_without_sites), dataset_name))
3942 self.logger.
debug(
" (%s)" % \
3944 runs_without_sites]))
3950 if not self.harvesting_mode ==
"two-step":
3951 for run_number
in self.datasets_to_use[dataset_name]:
3956 num_sites = len(self.datasets_information[dataset_name] \
3957 [
"sites"][run_number])
3958 if num_sites > 1
and \
3959 not self.datasets_information[dataset_name] \
3960 [
"mirrored"][run_number]:
3964 msg =
" Dataset `%s', run %d is spread across more " \
3965 "than one site.\n" \
3966 " Cannot run single-step harvesting on " \
3967 "samples spread across multiple sites" % \
3968 (dataset_name, run_number)
3970 dataset_names_after_checks[dataset_name].
remove(run_number)
3974 "--> skipping" % msg)
3983 tmp = [j
for (i, j)
in self.datasets_information \
3984 [dataset_name][
"num_events"].
items() \
3985 if i
in self.datasets_to_use[dataset_name]]
3986 num_events_dataset = sum(tmp)
3988 if num_events_dataset < 1:
3989 msg =
" dataset `%s' is empty" % dataset_name
3990 del dataset_names_after_checks[dataset_name]
3992 "--> skipping" % msg)
4005 self.datasets_information[dataset_name] \
4006 [
"num_events"].
items()
if i[1] < 1]
4007 tmp = [i
for i
in tmp
if i[0]
in self.datasets_to_use[dataset_name]]
4008 empty_runs = dict(tmp)
4009 if len(empty_runs) > 0:
4010 for empty_run
in empty_runs:
4012 dataset_names_after_checks[dataset_name].
remove(empty_run)
4015 self.logger.
info(
" removed %d empty run(s) from dataset `%s'" % \
4016 (len(empty_runs), dataset_name))
4017 self.logger.
debug(
" (%s)" % \
4018 ", ".
join([
str(i)
for i
in empty_runs]))
4024 dataset_names_after_checks_tmp = copy.deepcopy(dataset_names_after_checks)
4025 for (dataset_name, runs)
in six.iteritems(dataset_names_after_checks):
4027 self.logger.
warning(
" Removing dataset without any runs " \
4030 del dataset_names_after_checks_tmp[dataset_name]
4031 dataset_names_after_checks = dataset_names_after_checks_tmp
4035 self.logger.
warning(
" --> Removed %d dataset(s)" % \
4036 (len(self.datasets_to_use) -
4037 len(dataset_names_after_checks)))
4040 self.datasets_to_use = dataset_names_after_checks
4047 """Escape a DBS dataset name.
4049 Escape a DBS dataset name such that it does not cause trouble
4050 with the file system. This means turning each `/' into `__',
4051 except for the first one which is just removed.
4055 escaped_dataset_name = dataset_name
4056 escaped_dataset_name = escaped_dataset_name.strip(
"/")
4057 escaped_dataset_name = escaped_dataset_name.replace(
"/",
"__")
4059 return escaped_dataset_name
4066 """Generate the name of the configuration file to be run by
4069 Depending on the harvesting mode (single-step or two-step)
4070 this is the name of the real harvesting configuration or the
4071 name of the first-step ME summary extraction configuration.
4075 if self.harvesting_mode ==
"single-step":
4076 config_file_name = self.create_harvesting_config_file_name(dataset_name)
4077 elif self.harvesting_mode ==
"single-step-allow-partial":
4078 config_file_name = self.create_harvesting_config_file_name(dataset_name)
4085 elif self.harvesting_mode ==
"two-step":
4086 config_file_name = self.create_me_summary_config_file_name(dataset_name)
4088 assert False,
"ERROR Unknown harvesting mode `%s'" % \
4089 self.harvesting_mode
4092 return config_file_name
4098 "Generate the name to be used for the harvesting config file."
4100 file_name_base =
"harvesting.py"
4101 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4102 config_file_name = file_name_base.replace(
".py",
4104 dataset_name_escaped)
4107 return config_file_name
4112 "Generate the name of the ME summary extraction config file."
4114 file_name_base =
"me_extraction.py"
4115 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4116 config_file_name = file_name_base.replace(
".py",
4118 dataset_name_escaped)
4121 return config_file_name
4126 """Create the name of the output file name to be used.
4128 This is the name of the output file of the `first step'. In
4129 the case of single-step harvesting this is already the final
4130 harvesting output ROOT file. In the case of two-step
4131 harvesting it is the name of the intermediary ME summary
4144 if self.harvesting_mode ==
"single-step":
4146 assert not run_number
is None
4148 output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4149 elif self.harvesting_mode ==
"single-step-allow-partial":
4151 assert not run_number
is None
4153 output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4154 elif self.harvesting_mode ==
"two-step":
4156 assert run_number
is None
4158 output_file_name = self.create_me_summary_output_file_name(dataset_name)
4161 assert False,
"ERROR Unknown harvesting mode `%s'" % \
4162 self.harvesting_mode
4165 return output_file_name
4170 """Generate the name to be used for the harvesting output file.
4172 This harvesting output file is the _final_ ROOT output file
4173 containing the harvesting results. In case of two-step
4174 harvesting there is an intermediate ME output file as well.
4178 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4186 output_file_name =
"DQM_V0001_R%09d__%s.root" % \
4187 (run_number, dataset_name_escaped)
4188 if self.harvesting_mode.
find(
"partial") > -1:
4191 if self.datasets_information[dataset_name] \
4192 [
"mirrored"][run_number] ==
False:
4193 output_file_name = output_file_name.replace(
".root", \
4197 return output_file_name
4202 """Generate the name of the intermediate ME file name to be
4203 used in two-step harvesting.
4207 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4208 output_file_name =
"me_summary_%s.root" % \
4209 dataset_name_escaped
4212 return output_file_name
4217 """Create the block name to use for this dataset/run number.
4219 This is what appears in the brackets `[]' in multicrab.cfg. It
4220 is used as the name of the job and to create output
4225 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4226 block_name =
"%s_%09d_%s" % (dataset_name_escaped, run_number, index)
4234 """Create a CRAB configuration for a given job.
4236 NOTE: This is _not_ a complete (as in: submittable) CRAB
4237 configuration. It is used to store the common settings for the
4238 multicrab configuration.
4240 NOTE: Only CERN CASTOR area (/castor/cern.ch/) is supported.
4242 NOTE: According to CRAB, you `Must define exactly two of
4243 total_number_of_events, events_per_job, or
4244 number_of_jobs.'. For single-step harvesting we force one job,
4245 for the rest we don't really care.
4248 # With the current version of CRAB (2.6.1), in which Daniele
4249 # fixed the behaviour of no_block_boundary for me, one _has to
4250 # specify_ the total_number_of_events and one single site in
4251 # the se_white_list.
4259 castor_prefix = self.castor_prefix
4261 tmp.append(self.config_file_header())
4266 tmp.append(
"[CRAB]")
4267 tmp.append(
"jobtype = cmssw")
4272 tmp.append(
"[GRID]")
4273 tmp.append(
"virtual_organization=cms")
4278 tmp.append(
"[USER]")
4279 tmp.append(
"copy_data = 1")
4284 tmp.append(
"[CMSSW]")
4285 tmp.append(
"# This reveals data hosted on T1 sites,")
4286 tmp.append(
"# which is normally hidden by CRAB.")
4287 tmp.append(
"show_prod = 1")
4288 tmp.append(
"number_of_jobs = 1")
4289 if self.Jsonlumi ==
True:
4290 tmp.append(
"lumi_mask = %s" % self.Jsonfilename)
4291 tmp.append(
"total_number_of_lumis = -1")
4293 if self.harvesting_type ==
"DQMOffline":
4294 tmp.append(
"total_number_of_lumis = -1")
4296 tmp.append(
"total_number_of_events = -1")
4297 if self.harvesting_mode.
find(
"single-step") > -1:
4298 tmp.append(
"# Force everything to run in one job.")
4299 tmp.append(
"no_block_boundary = 1")
4306 crab_config =
"\n".
join(tmp)
4314 """Create a multicrab.cfg file for all samples.
4316 This creates the contents for a multicrab.cfg file that uses
4317 the crab.cfg file (generated elsewhere) for the basic settings
4318 and contains blocks for each run of each dataset.
4321 # The fact that it's necessary to specify the se_white_list
4322 # and the total_number_of_events is due to our use of CRAB
4323 # version 2.6.1. This should no longer be necessary in the
4329 cmd=
"who i am | cut -f1 -d' '"
4330 (status, output)=commands.getstatusoutput(cmd)
4333 if self.caf_access ==
True:
4334 print(
"Extracting %s as user name" %UserName)
4336 number_max_sites = self.nr_max_sites + 1
4338 multicrab_config_lines = []
4339 multicrab_config_lines.append(self.config_file_header())
4340 multicrab_config_lines.append(
"")
4341 multicrab_config_lines.append(
"[MULTICRAB]")
4342 multicrab_config_lines.append(
"cfg = crab.cfg")
4343 multicrab_config_lines.append(
"")
4345 dataset_names = sorted(self.datasets_to_use.
keys())
4347 for dataset_name
in dataset_names:
4348 runs = self.datasets_to_use[dataset_name]
4349 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4350 castor_prefix = self.castor_prefix
4355 castor_dir = self.datasets_information[dataset_name] \
4356 [
"castor_path"][run]
4358 cmd =
"rfdir %s" % castor_dir
4359 (status, output) = commands.getstatusoutput(cmd)
4361 if len(output) <= 0:
4367 assert (len(self.datasets_information[dataset_name] \
4368 [
"sites"][run]) == 1)
or \
4369 self.datasets_information[dataset_name][
"mirrored"]
4372 site_names = self.datasets_information[dataset_name] \
4373 [
"sites"][run].
keys()
4375 for i
in range(1, number_max_sites, 1):
4376 if len(site_names) > 0:
4377 index =
"site_%02d" % (i)
4379 config_file_name = self. \
4381 output_file_name = self. \
4392 if len(site_names) > 1:
4393 cmssw_version = self.datasets_information[dataset_name] \
4395 self.logger.
info(
"Picking site for mirrored dataset " \
4397 (dataset_name, run))
4398 site_name = self.pick_a_site(site_names, cmssw_version)
4399 if site_name
in site_names:
4400 site_names.remove(site_name)
4403 site_name = site_names[0]
4404 site_names.remove(site_name)
4406 if site_name
is self.no_matching_site_found_str:
4410 nevents = self.datasets_information[dataset_name][
"num_events"][run]
4413 multicrab_block_name = self.create_multicrab_block_name( \
4414 dataset_name, run, index)
4415 multicrab_config_lines.append(
"[%s]" % \
4416 multicrab_block_name)
4420 if site_name ==
"caf.cern.ch":
4421 multicrab_config_lines.append(
"CRAB.use_server=0")
4422 multicrab_config_lines.append(
"CRAB.scheduler=caf")
4424 multicrab_config_lines.append(
"scheduler = glite")
4428 if site_name ==
"caf.cern.ch":
4431 multicrab_config_lines.append(
"GRID.se_white_list = %s" % \
4433 multicrab_config_lines.append(
"# This removes the default blacklisting of T1 sites.")
4434 multicrab_config_lines.append(
"GRID.remove_default_blacklist = 1")
4435 multicrab_config_lines.append(
"GRID.rb = CERN")
4436 if not self.non_t1access:
4437 multicrab_config_lines.append(
"GRID.role = t1access")
4442 castor_dir = castor_dir.replace(castor_prefix,
"")
4443 multicrab_config_lines.append(
"USER.storage_element=srm-cms.cern.ch")
4444 multicrab_config_lines.append(
"USER.user_remote_dir = %s" % \
4446 multicrab_config_lines.append(
"USER.check_user_remote_dir=0")
4448 if site_name ==
"caf.cern.ch":
4449 multicrab_config_lines.append(
"USER.storage_path=%s" % castor_prefix)
4455 multicrab_config_lines.append(
"USER.storage_path=/srm/managerv2?SFN=%s" % castor_prefix)
4462 multicrab_config_lines.append(
"CMSSW.pset = %s" % \
4464 multicrab_config_lines.append(
"CMSSW.datasetpath = %s" % \
4466 multicrab_config_lines.append(
"CMSSW.runselection = %d" % \
4469 if self.Jsonlumi ==
True:
4472 if self.harvesting_type ==
"DQMOffline":
4475 multicrab_config_lines.append(
"CMSSW.total_number_of_events = %d" % \
4478 multicrab_config_lines.append(
"CMSSW.output_file = %s" % \
4483 if site_name ==
"caf.cern.ch":
4484 multicrab_config_lines.append(
"CAF.queue=cmscaf1nd")
4488 multicrab_config_lines.append(
"")
4492 self.all_sites_found =
True
4494 multicrab_config =
"\n".
join(multicrab_config_lines)
4497 return multicrab_config
4502 """Check if globaltag exists.
4504 Check if globaltag exists as GlobalTag in the database given
4505 by self.frontier_connection_name['globaltag']. If globaltag is
4506 None, self.globaltag is used instead.
4508 If we're going to use reference histograms this method also
4509 checks for the existence of the required key in the GlobalTag.
4513 if globaltag
is None:
4514 globaltag = self.globaltag
4517 if globaltag.endswith(
"::All"):
4518 globaltag = globaltag[:-5]
4520 connect_name = self.frontier_connection_name[
"globaltag"]
4528 connect_name = connect_name.replace(
"frontier://",
4529 "frontier://cmsfrontier:8000/")
4531 connect_name += self.db_account_name_cms_cond_globaltag()
4533 tag_exists = self.check_globaltag_exists(globaltag, connect_name)
4537 tag_contains_ref_hist_key =
False
4538 if self.use_ref_hists
and tag_exists:
4540 tag_contains_ref_hist_key = self.check_globaltag_contains_ref_hist_key(globaltag, connect_name)
4544 if self.use_ref_hists:
4545 ret_val = tag_exists
and tag_contains_ref_hist_key
4547 ret_val = tag_exists
4557 """Check if globaltag exists.
4561 self.logger.
info(
"Checking existence of GlobalTag `%s'" % \
4563 self.logger.
debug(
" (Using database connection `%s')" % \
4566 cmd =
"cmscond_tagtree_list -c %s -T %s" % \
4567 (connect_name, globaltag)
4568 (status, output) = commands.getstatusoutput(cmd)
4570 output.find(
"error") > -1:
4571 msg =
"Could not check existence of GlobalTag `%s' in `%s'" % \
4572 (globaltag, connect_name)
4573 if output.find(
".ALL_TABLES not found") > -1:
4575 "Missing database account `%s'" % \
4576 (msg, output.split(
".ALL_TABLES")[0].
split()[-1])
4577 self.logger.fatal(msg)
4578 self.logger.
debug(
"Command used:")
4579 self.logger.
debug(
" %s" % cmd)
4580 self.logger.
debug(
"Output received:")
4581 self.logger.
debug(output)
4583 if output.find(
"does not exist") > -1:
4584 self.logger.
debug(
"GlobalTag `%s' does not exist in `%s':" % \
4585 (globaltag, connect_name))
4586 self.logger.
debug(
"Output received:")
4587 self.logger.
debug(output)
4591 self.logger.
info(
" GlobalTag exists? -> %s" % tag_exists)
4599 """Check if globaltag contains the required RefHistos key.
4604 tag_contains_key =
None
4605 ref_hist_key =
"RefHistos"
4606 self.logger.
info(
"Checking existence of reference " \
4607 "histogram key `%s' in GlobalTag `%s'" % \
4608 (ref_hist_key, globaltag))
4609 self.logger.
debug(
" (Using database connection `%s')" % \
4611 cmd =
"cmscond_tagtree_list -c %s -T %s -n %s" % \
4612 (connect_name, globaltag, ref_hist_key)
4613 (status, output) = commands.getstatusoutput(cmd)
4615 output.find(
"error") > -1:
4616 msg =
"Could not check existence of key `%s'" % \
4617 (ref_hist_key, connect_name)
4618 self.logger.fatal(msg)
4619 self.logger.
debug(
"Command used:")
4620 self.logger.
debug(
" %s" % cmd)
4621 self.logger.
debug(
"Output received:")
4622 self.logger.
debug(
" %s" % output)
4625 self.logger.
debug(
"Required key for use of reference " \
4626 "histograms `%s' does not exist " \
4627 "in GlobalTag `%s':" % \
4628 (ref_hist_key, globaltag))
4629 self.logger.
debug(
"Output received:")
4630 self.logger.
debug(output)
4631 tag_contains_key =
False
4633 tag_contains_key =
True
4635 self.logger.
info(
" GlobalTag contains `%s' key? -> %s" % \
4636 (ref_hist_key, tag_contains_key))
4639 return tag_contains_key
4644 """Check the existence of tag_name in database connect_name.
4646 Check if tag_name exists as a reference histogram tag in the
4647 database given by self.frontier_connection_name['refhists'].
4651 connect_name = self.frontier_connection_name[
"refhists"]
4652 connect_name += self.db_account_name_cms_cond_dqm_summary()
4654 self.logger.
debug(
"Checking existence of reference " \
4655 "histogram tag `%s'" % \
4657 self.logger.
debug(
" (Using database connection `%s')" % \
4660 cmd =
"cmscond_list_iov -c %s" % \
4662 (status, output) = commands.getstatusoutput(cmd)
4664 msg =
"Could not check existence of tag `%s' in `%s'" % \
4665 (tag_name, connect_name)
4666 self.logger.fatal(msg)
4667 self.logger.
debug(
"Command used:")
4668 self.logger.
debug(
" %s" % cmd)
4669 self.logger.
debug(
"Output received:")
4670 self.logger.
debug(output)
4672 if not tag_name
in output.split():
4673 self.logger.
debug(
"Reference histogram tag `%s' " \
4674 "does not exist in `%s'" % \
4675 (tag_name, connect_name))
4676 self.logger.
debug(
" Existing tags: `%s'" % \
4677 "', `".
join(output.split()))
4681 self.logger.
debug(
" Reference histogram tag exists? " \
4682 "-> %s" % tag_exists)
4690 """Build the es_prefer snippet for the reference histograms.
4692 The building of the snippet is wrapped in some care-taking
4693 code that figures out the name of the reference histogram set
4694 and makes sure the corresponding tag exists.
4700 ref_hist_tag_name = self.ref_hist_mappings[dataset_name]
4702 connect_name = self.frontier_connection_name[
"refhists"]
4703 connect_name += self.db_account_name_cms_cond_dqm_summary()
4704 record_name =
"DQMReferenceHistogramRootFileRcd"
4708 code_lines.append(
"from CondCore.DBCommon.CondDBSetup_cfi import *")
4709 code_lines.append(
"process.ref_hist_source = cms.ESSource(\"PoolDBESSource\", CondDBSetup,")
4710 code_lines.append(
" connect = cms.string(\"%s\")," % connect_name)
4711 code_lines.append(
" toGet = cms.VPSet(cms.PSet(record = cms.string(\"%s\")," % record_name)
4712 code_lines.append(
" tag = cms.string(\"%s\"))," % ref_hist_tag_name)
4713 code_lines.append(
" )")
4714 code_lines.append(
" )")
4715 code_lines.append(
"process.es_prefer_ref_hist_source = cms.ESPrefer(\"PoolDBESSource\", \"ref_hist_source\")")
4717 snippet =
"\n".
join(code_lines)
4725 """Create the Python harvesting configuration for harvesting.
4727 The basic configuration is created by
4728 Configuration.PyReleaseValidation.ConfigBuilder. (This mimics
4729 what cmsDriver.py does.) After that we add some specials
4732 NOTE: On one hand it may not be nice to circumvent
4733 cmsDriver.py, on the other hand cmsDriver.py does not really
4734 do anything itself. All the real work is done by the
4735 ConfigBuilder so there is not much risk that we miss out on
4736 essential developments of cmsDriver in the future.
4741 config_options = defaultOptions
4746 config_options.name =
"harvesting"
4747 config_options.scenario =
"pp"
4748 config_options.number = 1
4749 config_options.arguments = self.ident_string()
4750 config_options.evt_type = config_options.name
4751 config_options.customisation_file =
None
4752 config_options.filein =
"dummy_value"
4753 config_options.filetype =
"EDM"
4755 config_options.gflash =
"dummy_value"
4759 config_options.dbsquery =
""
4766 config_options.step =
"HARVESTING:%s" % \
4767 self.harvesting_info[self.harvesting_type] \
4769 config_options.beamspot = self.harvesting_info[self.harvesting_type] \
4771 config_options.eventcontent = self.harvesting_info \
4772 [self.harvesting_type] \
4774 config_options.harvesting = self.harvesting_info \
4775 [self.harvesting_type] \
4782 datatype = self.datasets_information[dataset_name][
"datatype"]
4783 config_options.isMC = (datatype.lower() ==
"mc")
4784 config_options.isData = (datatype.lower() ==
"data")
4785 globaltag = self.datasets_information[dataset_name][
"globaltag"]
4787 config_options.conditions = self.format_conditions_string(globaltag)
4791 if "with_input" in getargspec(ConfigBuilder.__init__)[0]:
4793 config_builder =
ConfigBuilder(config_options, with_input=
True)
4797 config_builder.prepare(
True)
4798 config_contents = config_builder.pythonCfgCode
4807 marker_lines.append(sep)
4808 marker_lines.append(
"# Code between these markers was generated by")
4809 marker_lines.append(
"# Configuration.PyReleaseValidation." \
4812 marker_lines.append(sep)
4813 marker =
"\n".
join(marker_lines)
4815 tmp = [self.config_file_header()]
4819 tmp.append(config_contents)
4823 config_contents =
"\n".
join(tmp)
4828 customisations = [
""]
4830 customisations.append(
"# Now follow some customisations")
4831 customisations.append(
"")
4832 connect_name = self.frontier_connection_name[
"globaltag"]
4833 connect_name += self.db_account_name_cms_cond_globaltag()
4834 customisations.append(
"process.GlobalTag.connect = \"%s\"" % \
4838 if self.saveByLumiSection ==
True:
4839 customisations.append(
"process.dqmSaver.saveByLumiSection = 1")
4843 customisations.append(
"")
4857 use_es_prefer = (self.harvesting_type ==
"RelVal")
4858 use_refs = use_es_prefer
or \
4859 (
not self.harvesting_type ==
"MC")
4861 use_refs = use_refs
and self.use_ref_hists
4869 customisations.append(
"print \"Not using reference histograms\"")
4870 customisations.append(
"if hasattr(process, \"dqmRefHistoRootFileGetter\"):")
4871 customisations.append(
" for (sequence_name, sequence) in six.iteritems(process.sequences):")
4872 customisations.append(
" if sequence.remove(process.dqmRefHistoRootFileGetter):")
4873 customisations.append(
" print \"Removed process.dqmRefHistoRootFileGetter from sequence `%s'\" % \\")
4874 customisations.append(
" sequence_name")
4875 customisations.append(
"process.dqmSaver.referenceHandling = \"skip\"")
4879 customisations.append(
"process.dqmSaver.referenceHandling = \"all\"")
4881 es_prefer_snippet = self.create_es_prefer_snippet(dataset_name)
4882 customisations.append(es_prefer_snippet)
4886 workflow_name = dataset_name
4887 if self.harvesting_mode ==
"single-step-allow-partial":
4888 workflow_name +=
"_partial"
4889 customisations.append(
"process.dqmSaver.workflow = \"%s\"" % \
4926 config_contents = config_contents +
"\n".
join(customisations)
4931 return config_contents
4958 tmp.append(self.config_file_header())
4960 tmp.append(
"import FWCore.ParameterSet.Config as cms")
4962 tmp.append(
"process = cms.Process(\"ME2EDM\")")
4964 tmp.append(
"# Import of standard configurations")
4965 tmp.append(
"process.load(\"Configuration/EventContent/EventContent_cff\")")
4967 tmp.append(
"# We don't really process any events, just keep this set to one to")
4968 tmp.append(
"# make sure things work.")
4969 tmp.append(
"process.maxEvents = cms.untracked.PSet(")
4970 tmp.append(
" input = cms.untracked.int32(1)")
4973 tmp.append(
"process.options = cms.untracked.PSet(")
4974 tmp.append(
" Rethrow = cms.untracked.vstring(\"ProductNotFound\")")
4977 tmp.append(
"process.source = cms.Source(\"PoolSource\",")
4978 tmp.append(
" processingMode = \\")
4979 tmp.append(
" cms.untracked.string(\"RunsAndLumis\"),")
4980 tmp.append(
" fileNames = \\")
4981 tmp.append(
" cms.untracked.vstring(\"no_file_specified\")")
4984 tmp.append(
"# Output definition: drop everything except for the monitoring.")
4985 tmp.append(
"process.output = cms.OutputModule(")
4986 tmp.append(
" \"PoolOutputModule\",")
4987 tmp.append(
" outputCommands = \\")
4988 tmp.append(
" cms.untracked.vstring(\"drop *\", \\")
4989 tmp.append(
" \"keep *_MEtoEDMConverter_*_*\"),")
4990 output_file_name = self. \
4992 tmp.append(
" fileName = \\")
4993 tmp.append(
" cms.untracked.string(\"%s\")," % output_file_name)
4994 tmp.append(
" dataset = cms.untracked.PSet(")
4995 tmp.append(
" dataTier = cms.untracked.string(\"RECO\"),")
4996 tmp.append(
" filterName = cms.untracked.string(\"\")")
5000 tmp.append(
"# Additional output definition")
5001 tmp.append(
"process.out_step = cms.EndPath(process.output)")
5003 tmp.append(
"# Schedule definition")
5004 tmp.append(
"process.schedule = cms.Schedule(process.out_step)")
5007 config_contents =
"\n".
join(tmp)
5010 return config_contents
5048 """Write a CRAB job configuration Python file.
5052 self.logger.
info(
"Writing CRAB configuration...")
5054 file_name_base =
"crab.cfg"
5057 crab_contents = self.create_crab_config()
5060 crab_file_name = file_name_base
5062 crab_file =
file(crab_file_name,
"w")
5063 crab_file.write(crab_contents)
5066 self.logger.fatal(
"Could not write " \
5067 "CRAB configuration to file `%s'" % \
5069 raise Error(
"ERROR: Could not write to file `%s'!" % \
5077 """Write a multi-CRAB job configuration Python file.
5081 self.logger.
info(
"Writing multi-CRAB configuration...")
5083 file_name_base =
"multicrab.cfg"
5086 multicrab_contents = self.create_multicrab_config()
5089 multicrab_file_name = file_name_base
5091 multicrab_file =
file(multicrab_file_name,
"w")
5092 multicrab_file.write(multicrab_contents)
5093 multicrab_file.close()
5095 self.logger.fatal(
"Could not write " \
5096 "multi-CRAB configuration to file `%s'" % \
5097 multicrab_file_name)
5098 raise Error(
"ERROR: Could not write to file `%s'!" % \
5099 multicrab_file_name)
5106 """Write a harvesting job configuration Python file.
5108 NOTE: This knows nothing about single-step or two-step
5109 harvesting. That's all taken care of by
5110 create_harvesting_config.
5114 self.logger.
debug(
"Writing harvesting configuration for `%s'..." % \
5118 config_contents = self.create_harvesting_config(dataset_name)
5121 config_file_name = self. \
5124 config_file =
file(config_file_name,
"w")
5125 config_file.write(config_contents)
5128 self.logger.fatal(
"Could not write " \
5129 "harvesting configuration to file `%s'" % \
5131 raise Error(
"ERROR: Could not write to file `%s'!" % \
5139 """Write an ME-extraction configuration Python file.
5141 This `ME-extraction' (ME = Monitoring Element) is the first
5142 step of the two-step harvesting.
5146 self.logger.
debug(
"Writing ME-extraction configuration for `%s'..." % \
5150 config_contents = self.create_me_extraction_config(dataset_name)
5153 config_file_name = self. \
5156 config_file =
file(config_file_name,
"w")
5157 config_file.write(config_contents)
5160 self.logger.fatal(
"Could not write " \
5161 "ME-extraction configuration to file `%s'" % \
5163 raise Error(
"ERROR: Could not write to file `%s'!" % \
5172 """Check if we need to load and check the reference mappings.
5174 For data the reference histograms should be taken
5175 automatically from the GlobalTag, so we don't need any
5176 mappings. For RelVals we need to know a mapping to be used in
5177 the es_prefer code snippet (different references for each of
5180 WARNING: This implementation is a bit convoluted.
5186 if not dataset_name
is None:
5187 data_type = self.datasets_information[dataset_name] \
5189 mappings_needed = (data_type ==
"mc")
5191 if not mappings_needed:
5192 assert data_type ==
"data"
5195 tmp = [self.ref_hist_mappings_needed(dataset_name) \
5196 for dataset_name
in \
5197 self.datasets_information.
keys()]
5198 mappings_needed = (
True in tmp)
5201 return mappings_needed
5206 """Load the reference histogram mappings from file.
5208 The dataset name to reference histogram name mappings are read
5209 from a text file specified in self.ref_hist_mappings_file_name.
5214 assert len(self.ref_hist_mappings) < 1, \
5215 "ERROR Should not be RE-loading " \
5216 "reference histogram mappings!"
5219 self.logger.
info(
"Loading reference histogram mappings " \
5220 "from file `%s'" % \
5221 self.ref_hist_mappings_file_name)
5223 mappings_lines =
None
5225 mappings_file =
file(self.ref_hist_mappings_file_name,
"r")
5226 mappings_lines = mappings_file.readlines()
5227 mappings_file.close()
5229 msg =
"ERROR: Could not open reference histogram mapping "\
5230 "file `%s'" % self.ref_hist_mappings_file_name
5231 self.logger.fatal(msg)
5241 for mapping
in mappings_lines:
5243 if not mapping.startswith(
"#"):
5244 mapping = mapping.strip()
5245 if len(mapping) > 0:
5246 mapping_pieces = mapping.split()
5247 if len(mapping_pieces) != 2:
5248 msg =
"ERROR: The reference histogram mapping " \
5249 "file contains a line I don't " \
5250 "understand:\n %s" % mapping
5251 self.logger.fatal(msg)
5253 dataset_name = mapping_pieces[0].
strip()
5254 ref_hist_name = mapping_pieces[1].
strip()
5258 if dataset_name
in self.ref_hist_mappings:
5259 msg =
"ERROR: The reference histogram mapping " \
5260 "file contains multiple mappings for " \
5262 self.logger.fatal(msg)
5266 self.ref_hist_mappings[dataset_name] = ref_hist_name
5270 self.logger.
info(
" Successfully loaded %d mapping(s)" % \
5271 len(self.ref_hist_mappings))
5272 max_len =
max([len(i)
for i
in self.ref_hist_mappings.
keys()])
5273 for (map_from, map_to)
in six.iteritems(self.ref_hist_mappings):
5274 self.logger.
info(
" %-*s -> %s" % \
5275 (max_len, map_from, map_to))
5282 """Make sure all necessary reference histograms exist.
5284 Check that for each of the datasets to be processed a
5285 reference histogram is specified and that that histogram
5286 exists in the database.
5288 NOTE: There's a little complication here. Since this whole
5289 thing was designed to allow (in principle) harvesting of both
5290 data and MC datasets in one go, we need to be careful to check
5291 the availability fof reference mappings only for those
5292 datasets that need it.
5296 self.logger.
info(
"Checking reference histogram mappings")
5298 for dataset_name
in self.datasets_to_use:
5300 ref_hist_name = self.ref_hist_mappings[dataset_name]
5302 msg =
"ERROR: No reference histogram mapping found " \
5303 "for dataset `%s'" % \
5305 self.logger.fatal(msg)
5308 if not self.check_ref_hist_tag(ref_hist_name):
5309 msg =
"Reference histogram tag `%s' " \
5310 "(used for dataset `%s') does not exist!" % \
5311 (ref_hist_name, dataset_name)
5312 self.logger.fatal(msg)
5315 self.logger.
info(
" Done checking reference histogram mappings.")
5322 """Obtain all information on the datasets that we need to run.
5324 Use DBS to figure out all required information on our
5325 datasets, like the run numbers and the GlobalTag. All
5326 information is stored in the datasets_information member
5341 self.datasets_information = {}
5342 self.logger.
info(
"Collecting information for all datasets to process")
5343 dataset_names = sorted(self.datasets_to_use.
keys())
5344 for dataset_name
in dataset_names:
5348 self.logger.
info(sep_line)
5349 self.logger.
info(
" `%s'" % dataset_name)
5350 self.logger.
info(sep_line)
5352 runs = self.dbs_resolve_runs(dataset_name)
5353 self.logger.
info(
" found %d run(s)" % len(runs))
5355 self.logger.
debug(
" run number(s): %s" % \
5356 ", ".
join([
str(i)
for i
in runs]))
5360 self.logger.
warning(
" --> skipping dataset "
5362 assert False,
"Panic: found a dataset without runs " \
5366 cmssw_version = self.dbs_resolve_cmssw_version(dataset_name)
5367 self.logger.
info(
" found CMSSW version `%s'" % cmssw_version)
5370 datatype = self.dbs_resolve_datatype(dataset_name)
5371 self.logger.
info(
" sample is data or MC? --> %s" % \
5377 if self.globaltag
is None:
5378 globaltag = self.dbs_resolve_globaltag(dataset_name)
5380 globaltag = self.globaltag
5382 self.logger.
info(
" found GlobalTag `%s'" % globaltag)
5388 assert datatype ==
"data", \
5389 "ERROR Empty GlobalTag for MC dataset!!!"
5397 sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5401 for run_number
in sites_catalog.keys():
5402 num_events[run_number] = sites_catalog \
5403 [run_number][
"all_sites"]
5404 del sites_catalog[run_number][
"all_sites"]
5409 for run_number
in sites_catalog.keys():
5410 mirror_catalog[run_number] = sites_catalog \
5411 [run_number][
"mirrored"]
5412 del sites_catalog[run_number][
"mirrored"]
5441 self.datasets_information[dataset_name] = {}
5442 self.datasets_information[dataset_name][
"runs"] = runs
5443 self.datasets_information[dataset_name][
"cmssw_version"] = \
5445 self.datasets_information[dataset_name][
"globaltag"] = globaltag
5446 self.datasets_information[dataset_name][
"datatype"] = datatype
5447 self.datasets_information[dataset_name][
"num_events"] = num_events
5448 self.datasets_information[dataset_name][
"mirrored"] = mirror_catalog
5449 self.datasets_information[dataset_name][
"sites"] = sites_catalog
5453 castor_path_common = self.create_castor_path_name_common(dataset_name)
5454 self.logger.
info(
" output will go into `%s'" % \
5458 [self.create_castor_path_name_special(dataset_name, i, castor_path_common) \
5460 for path_name
in castor_paths.values():
5461 self.logger.
debug(
" %s" % path_name)
5462 self.datasets_information[dataset_name][
"castor_path"] = \
5470 """Tell the user what to do now, after this part is done.
5472 This should provide the user with some (preferably
5473 copy-pasteable) instructions on what to do now with the setups
5474 and files that have been created.
5484 self.logger.
info(
"")
5485 self.logger.
info(sep_line)
5486 self.logger.
info(
" Configuration files have been created.")
5487 self.logger.
info(
" From here on please follow the usual CRAB instructions.")
5488 self.logger.
info(
" Quick copy-paste instructions are shown below.")
5489 self.logger.
info(sep_line)
5491 self.logger.
info(
"")
5492 self.logger.
info(
" Create all CRAB jobs:")
5493 self.logger.
info(
" multicrab -create")
5494 self.logger.
info(
"")
5495 self.logger.
info(
" Submit all CRAB jobs:")
5496 self.logger.
info(
" multicrab -submit")
5497 self.logger.
info(
"")
5498 self.logger.
info(
" Check CRAB status:")
5499 self.logger.
info(
" multicrab -status")
5500 self.logger.
info(
"")
5502 self.logger.
info(
"")
5503 self.logger.
info(
" For more information please see the CMS Twiki:")
5504 self.logger.
info(
" %s" % twiki_url)
5505 self.logger.
info(sep_line)
5509 if not self.all_sites_found:
5510 self.logger.
warning(
" For some of the jobs no matching " \
5511 "site could be found")
5512 self.logger.
warning(
" --> please scan your multicrab.cfg" \
5513 "for occurrences of `%s'." % \
5514 self.no_matching_site_found_str)
5515 self.logger.
warning(
" You will have to fix those " \
5523 "Main entry point of the CMS harvester."
5533 self.parse_cmd_line_options()
5535 self.check_input_status()
5548 self.setup_harvesting_info()
5551 self.build_dataset_use_list()
5553 self.build_dataset_ignore_list()
5556 self.build_runs_use_list()
5557 self.build_runs_ignore_list()
5564 self.process_dataset_ignore_list()
5568 self.build_datasets_information()
5570 if self.use_ref_hists
and \
5571 self.ref_hist_mappings_needed():
5574 self.load_ref_hist_mappings()
5578 self.check_ref_hist_mappings()
5580 self.logger.
info(
"No need to load reference " \
5581 "histogram mappings file")
5596 self.process_runs_use_and_ignore_lists()
5601 if self.harvesting_mode ==
"single-step-allow-partial":
5602 self.singlify_datasets()
5605 self.check_dataset_list()
5607 if len(self.datasets_to_use) < 1:
5608 self.logger.
info(
"After all checks etc. " \
5609 "there are no datasets (left?) " \
5613 self.logger.
info(
"After all checks etc. we are left " \
5614 "with %d dataset(s) to process " \
5615 "for a total of %d runs" % \
5616 (len(self.datasets_to_use),
5617 sum([len(i)
for i
in \
5618 self.datasets_to_use.
values()])))
5645 self.create_and_check_castor_dirs()
5649 self.write_crab_config()
5650 self.write_multicrab_config()
5660 for dataset_name
in self.datasets_to_use.
keys():
5662 self.write_harvesting_config(dataset_name)
5663 if self.harvesting_mode ==
"two-step":
5664 self.write_me_extraction_config(dataset_name)
5672 for run_number
in self.datasets_to_use[dataset_name]:
5673 tmp[run_number] = self.datasets_information \
5674 [dataset_name][
"num_events"][run_number]
5675 if dataset_name
in self.book_keeping_information:
5676 self.book_keeping_information[dataset_name].
update(tmp)
5678 self.book_keeping_information[dataset_name] = tmp
5681 self.show_exit_message()
5683 except Usage
as err:
5688 except Error
as err:
5692 except Exception
as err:
5701 if isinstance(err, SystemExit):
5702 self.logger.fatal(err.code)
5703 elif not isinstance(err, KeyboardInterrupt):
5704 self.logger.fatal(
"!" * 50)
5705 self.logger.fatal(
" This looks like a serious problem.")
5706 self.logger.fatal(
" If you are sure you followed all " \
5708 self.logger.fatal(
" please copy the below stack trace together")
5709 self.logger.fatal(
" with a description of what you were doing to")
5710 self.logger.fatal(
" jeroen.hegeman@cern.ch.")
5711 self.logger.fatal(
" %s" % self.ident_string())
5712 self.logger.fatal(
"!" * 50)
5713 self.logger.fatal(
str(err))
5715 traceback_string = traceback.format_exc()
5716 for line
in traceback_string.split(
"\n"):
5717 self.logger.fatal(line)
5718 self.logger.fatal(
"!" * 50)
5733 if self.crab_submission ==
True:
5734 os.system(
"multicrab -create")
5735 os.system(
"multicrab -submit")
5746 if __name__ ==
"__main__":
5747 "Main entry point for harvesting."