15 """Main program to run all kinds of harvesting.
17 These are the basic kinds of harvesting implemented (contact me if
18 your favourite is missing):
20 - RelVal : Run for release validation samples. Makes heavy use of MC
23 - RelValFS: FastSim RelVal.
25 - MC : Run for MC samples.
27 - DQMOffline : Run for real data (could also be run for MC).
29 For the mappings of these harvesting types to sequence names please
30 see the setup_harvesting_info() and option_handler_list_types()
37 __version__ =
"3.8.2p1"
38 __author__ =
"Jeroen Hegeman (jeroen.hegeman@cern.ch)," \
39 "Niklas Pietsch (niklas.pietsch@desy.de)"
41 twiki_url =
"https://twiki.cern.ch/twiki/bin/view/CMS/CmsHarvester"
97 from inspect
import getargspec
98 from random
import choice
102 from DBSAPI.dbsApi
import DbsApi
107 global SAXParseException
109 from xml.sax
import SAXParseException
111 import Configuration.PyReleaseValidation
112 from Configuration.PyReleaseValidation.ConfigBuilder
import \
113 ConfigBuilder, defaultOptions
132 return repr(self.
msg)
143 return repr(self.
msg)
150 """Helper class to add some customised help output to cmsHarvester.
152 We want to add some instructions, as well as a pointer to the CMS
162 usage_lines.append(sep_line)
163 usage_lines.append(
"Welcome to the CMS harvester, a (hopefully useful)")
164 usage_lines.append(
"tool to create harvesting configurations.")
165 usage_lines.append(
"For more information please have a look at the CMS Twiki:")
166 usage_lines.append(
" %s" % twiki_url)
167 usage_lines.append(sep_line)
168 usage_lines.append(
"")
172 usage_lines.append(optparse.IndentedHelpFormatter. \
175 formatted_usage =
"\n".
join(usage_lines)
176 return formatted_usage
185 """XML handler class to parse DBS results.
187 The tricky thing here is that older DBS versions (2.0.5 and
188 earlier) return results in a different XML format than newer
189 versions. Previously the result values were returned as attributes
190 to the `result' element. The new approach returns result values as
191 contents of named elements.
193 The old approach is handled directly in startElement(), the new
194 approach in characters().
196 NOTE: All results are returned in the form of string values of
207 "dataset.tag" :
"PROCESSEDDATASET_GLOBALTAG",
208 "datatype.type" :
"PRIMARYDSTYPE_TYPE",
209 "run" :
"RUNS_RUNNUMBER",
210 "run.number" :
"RUNS_RUNNUMBER",
211 "file.name" :
"FILES_LOGICALFILENAME",
212 "file.numevents" :
"FILES_NUMBEROFEVENTS",
213 "algo.version" :
"APPVERSION_VERSION",
214 "site" :
"STORAGEELEMENT_SENAME",
225 self.element_position.append(name)
234 key = DBSXMLHandler.mapping[name]
235 value = str(attrs[key])
245 "closing unopenend element `%s'" % name
254 self.element_position.pop()
262 self.current_value.append(content)
268 """Make sure that all results arrays have equal length.
270 We should have received complete rows from DBS. I.e. all
271 results arrays in the handler should be of equal length.
277 res_names = self.results.keys()
278 if len(res_names) > 1:
279 for res_name
in res_names[1:]:
280 res_tmp = self.
results[res_name]
281 if len(res_tmp) != len(self.
results[res_names[0]]):
282 results_valid =
False
293 """Class to perform CMS harvesting.
295 More documentation `obviously' to follow.
302 "Initialize class and process command line options."
329 "single-step-allow-partial",
359 for key
in self.frontier_connection_name.keys():
408 self.Jsonlumi =
False
419 self.castor_base_dir =
None
420 self.castor_base_dir_default =
"/castor/cern.ch/" \
422 "dqm/offline/harvesting_output/"
427 self.book_keeping_file_name_default =
"harvesting_accounting.txt"
434 self.ref_hist_mappings_file_name_default =
"harvesting_ref_hist_mappings.txt"
439 self.castor_prefix =
"/castor/cern.ch"
445 self.non_t1access =
False
446 self.caf_access =
False
447 self.saveByLumiSection =
False
448 self.crab_submission =
False
449 self.nr_max_sites = 1
451 self.preferred_site =
"no preference"
454 self.datasets_to_use = {}
456 self.datasets_to_ignore = {}
458 self.book_keeping_information = {}
461 self.ref_hist_mappings = {}
465 self.runs_to_use = {}
466 self.runs_to_ignore = {}
469 self.sites_and_versions_cache = {}
472 self.globaltag_check_cache = []
476 self.all_sites_found =
True
479 self.no_matching_site_found_str =
"no_matching_site_found"
482 if cmd_line_opts
is None:
483 cmd_line_opts = sys.argv[1:]
487 log_handler = logging.StreamHandler()
490 log_formatter = logging.Formatter(
"%(message)s")
491 log_handler.setFormatter(log_formatter)
492 logger = logging.getLogger()
494 logger.addHandler(log_handler)
506 "Clean up after ourselves."
518 "Create a timestamp to use in the created config files."
520 time_now = datetime.datetime.utcnow()
522 time_now = time_now.replace(microsecond = 0)
523 time_stamp =
"%sUTC" % datetime.datetime.isoformat(time_now)
531 "Spit out an identification string for cmsHarvester.py."
533 ident_str =
"`cmsHarvester.py " \
534 "version %s': cmsHarvester.py %s" % \
536 reduce(
lambda x, y: x+
' '+y, sys.argv[1:]))
543 """Create the conditions string needed for `cmsDriver'.
545 Just glueing the FrontierConditions bit in front of it really.
556 if globaltag.lower().
find(
"conditions") > -1:
557 conditions_string = globaltag
559 conditions_string =
"FrontierConditions_GlobalTag,%s" % \
563 return conditions_string
568 """Return the database account name used to store the GlobalTag.
570 The name of the database account depends (albeit weakly) on
571 the CMSSW release version.
577 account_name =
"CMS_COND_31X_GLOBALTAG"
585 """See db_account_name_cms_cond_globaltag."""
588 version = self.cmssw_version[6:11]
589 if version <
"3_4_0":
590 account_name =
"CMS_COND_31X_DQM_SUMMARY"
592 account_name =
"CMS_COND_34X"
600 "Create a nice header to be used to mark the generated files."
606 tmp.append(
"# %s" % time_stamp)
607 tmp.append(
"# WARNING: This file was created automatically!")
609 tmp.append(
"# Created by %s" % ident_str)
611 header =
"\n".
join(tmp)
619 """Adjust the level of output generated.
622 - normal : default level of output
623 - quiet : less output than the default
624 - verbose : some additional information
625 - debug : lots more information, may be overwhelming
627 NOTE: The debug option is a bit special in the sense that it
628 also modifies the output format.
635 "NORMAL" : logging.INFO,
636 "QUIET" : logging.WARNING,
637 "VERBOSE" : logging.INFO,
638 "DEBUG" : logging.DEBUG
641 output_level = output_level.upper()
649 self.logger.fatal(
"Unknown output level `%s'" % ouput_level)
658 """Switch to debug mode.
660 This both increases the amount of output generated, as well as
661 changes the format used (more detailed information is given).
666 log_formatter_debug = logging.Formatter(
"[%(levelname)s] " \
676 log_handler = self.logger.handlers[0]
677 log_handler.setFormatter(log_formatter_debug)
685 "Switch to quiet mode: less verbose."
694 """Switch on `force mode' in which case we don't brake for nobody.
696 In so-called `force mode' all sanity checks are performed but
697 we don't halt on failure. Of course this requires some care
702 self.logger.debug(
"Switching on `force mode'.")
710 """Set the harvesting type to be used.
712 This checks that no harvesting type is already set, and sets
713 the harvesting type to be used to the one specified. If a
714 harvesting type is already set an exception is thrown. The
715 same happens when an unknown type is specified.
724 value = value.lower()
727 type_index = harvesting_types_lowered.index(value)
731 self.logger.fatal(
"Unknown harvesting type `%s'" % \
733 self.logger.fatal(
" possible types are: %s" %
735 raise Usage(
"Unknown harvesting type `%s'" % \
741 msg =
"Only one harvesting type should be specified"
742 self.logger.fatal(msg)
746 self.logger.info(
"Harvesting type to be used: `%s'" % \
754 """Set the harvesting mode to be used.
756 Single-step harvesting can be used for samples that are
757 located completely at a single site (= SE). Otherwise use
763 harvesting_mode = value.lower()
765 msg =
"Unknown harvesting mode `%s'" % harvesting_mode
766 self.logger.fatal(msg)
767 self.logger.fatal(
" possible modes are: %s" % \
774 msg =
"Only one harvesting mode should be specified"
775 self.logger.fatal(msg)
779 self.logger.info(
"Harvesting mode to be used: `%s'" % \
787 """Set the GlobalTag to be used, overriding our own choices.
789 By default the cmsHarvester will use the GlobalTag with which
790 a given dataset was created also for the harvesting. The
791 --globaltag option is the way to override this behaviour.
797 msg =
"Only one GlobalTag should be specified"
798 self.logger.fatal(msg)
802 self.logger.info(
"GlobalTag to be used: `%s'" % \
810 "Switch use of all reference histograms off."
814 self.logger.warning(
"Switching off all use of reference histograms")
822 """Override the default Frontier connection string.
824 Please only use this for testing (e.g. when a test payload has
825 been inserted into cms_orc_off instead of cms_orc_on).
827 This method gets called for three different command line
829 - --frontier-connection,
830 - --frontier-connection-for-globaltag,
831 - --frontier-connection-for-refhists.
832 Appropriate care has to be taken to make sure things are only
838 frontier_type = opt_str.split(
"-")[-1]
839 if frontier_type ==
"connection":
841 frontier_types = self.frontier_connection_name.keys()
843 frontier_types = [frontier_type]
847 for connection_name
in frontier_types:
849 msg =
"Please specify either:\n" \
850 " `--frontier-connection' to change the " \
851 "Frontier connection used for everything, or\n" \
852 "either one or both of\n" \
853 " `--frontier-connection-for-globaltag' to " \
854 "change the Frontier connection used for the " \
856 " `--frontier-connection-for-refhists' to change " \
857 "the Frontier connection used for the " \
858 "reference histograms."
859 self.logger.fatal(msg)
862 frontier_prefix =
"frontier://"
863 if not value.startswith(frontier_prefix):
864 msg =
"Expecting Frontier connections to start with " \
865 "`%s'. You specified `%s'." % \
866 (frontier_prefix, value)
867 self.logger.fatal(msg)
871 if value.find(
"FrontierProd") < 0
and \
872 value.find(
"FrontierProd") < 0:
873 msg =
"Expecting Frontier connections to contain either " \
874 "`FrontierProd' or `FrontierPrep'. You specified " \
875 "`%s'. Are you sure?" % \
877 self.logger.warning(msg)
879 if not value.endswith(
"/"):
882 for connection_name
in frontier_types:
886 frontier_type_str =
"unknown"
887 if connection_name ==
"globaltag":
888 frontier_type_str =
"the GlobalTag"
889 elif connection_name ==
"refhists":
890 frontier_type_str =
"the reference histograms"
892 self.logger.warning(
"Overriding default Frontier " \
893 "connection for %s " \
931 if opt_str.lower().
find(
"ignore") > -1:
937 if opt_str.lower().
find(
"dataset") > -1:
938 select_type =
"datasets"
942 if not self.
input_method[select_type][spec_type]
is None:
943 msg =
"Please only specify one input method " \
944 "(for the `%s' case)" % opt_str
945 self.logger.fatal(msg)
948 input_method = opt_str.replace(
"-",
"").
replace(
"ignore",
"")
949 self.
input_method[select_type][spec_type] = input_method
950 self.
input_name[select_type][spec_type] = value
952 self.logger.debug(
"Input method for the `%s' case: %s" % \
953 (spec_type, input_method))
960 """Store the name of the file to be used for book keeping.
962 The only check done here is that only a single book keeping
970 msg =
"Only one book keeping file should be specified"
971 self.logger.fatal(msg)
975 self.logger.info(
"Book keeping file to be used: `%s'" % \
983 """Store the name of the file for the ref. histogram mapping.
990 msg =
"Only one reference histogram mapping file " \
991 "should be specified"
992 self.logger.fatal(msg)
996 self.logger.info(
"Reference histogram mapping file " \
997 "to be used: `%s'" % \
1059 """Specify where on CASTOR the output should go.
1061 At the moment only output to CERN CASTOR is
1062 supported. Eventually the harvested results should go into the
1063 central place for DQM on CASTOR anyway.
1070 castor_prefix = self.castor_prefix
1073 castor_dir = os.path.join(os.path.sep, castor_dir)
1074 self.castor_base_dir = os.path.normpath(castor_dir)
1076 self.logger.info(
"CASTOR (base) area to be used: `%s'" % \
1077 self.castor_base_dir)
1084 """Set the self.no_t1access flag to try and create jobs that
1085 run without special `t1access' role.
1089 self.non_t1access =
True
1091 self.logger.warning(
"Running in `non-t1access' mode. " \
1092 "Will try to create jobs that run " \
1093 "without special rights but no " \
1094 "further promises...")
1101 """Set the self.caf_access flag to try and create jobs that
1105 self.caf_access =
True
1107 self.logger.warning(
"Running in `caf_access' mode. " \
1108 "Will try to create jobs that run " \
1110 "further promises...")
1117 """Set process.dqmSaver.saveByLumiSectiont=1 in cfg harvesting file
1119 self.saveByLumiSection =
True
1121 self.logger.warning(
"waning concerning saveByLumiSection option")
1129 """Crab jobs are not created and
1130 "submitted automatically",
1132 self.crab_submission =
True
1140 self.nr_max_sites = value
1146 self.preferred_site = value
1151 """List all harvesting types and their mappings.
1153 This lists all implemented harvesting types with their
1154 corresponding mappings to sequence names. This had to be
1155 separated out from the help since it depends on the CMSSW
1156 version and was making things a bit of a mess.
1158 NOTE: There is no way (at least not that I could come up with)
1159 to code this in a neat generic way that can be read both by
1160 this method and by setup_harvesting_info(). Please try hard to
1161 keep these two methods in sync!
1166 sep_line_short =
"-" * 20
1169 print "The following harvesting types are available:"
1172 print "`RelVal' maps to:"
1173 print " pre-3_3_0 : HARVESTING:validationHarvesting"
1174 print " 3_4_0_pre2 and later: HARVESTING:validationHarvesting+dqmHarvesting"
1175 print " Exceptions:"
1176 print " 3_3_0_pre1-4 : HARVESTING:validationHarvesting"
1177 print " 3_3_0_pre6 : HARVESTING:validationHarvesting"
1178 print " 3_4_0_pre1 : HARVESTING:validationHarvesting"
1180 print sep_line_short
1182 print "`RelValFS' maps to:"
1183 print " always : HARVESTING:validationHarvestingFS"
1185 print sep_line_short
1187 print "`MC' maps to:"
1188 print " always : HARVESTING:validationprodHarvesting"
1190 print sep_line_short
1192 print "`DQMOffline' maps to:"
1193 print " always : HARVESTING:dqmHarvesting"
1206 """Fill our dictionary with all info needed to understand
1209 This depends on the CMSSW version since at some point the
1210 names and sequences were modified.
1212 NOTE: There is no way (at least not that I could come up with)
1213 to code this in a neat generic way that can be read both by
1214 this method and by option_handler_list_types(). Please try
1215 hard to keep these two methods in sync!
1219 assert not self.cmssw_version
is None, \
1220 "ERROR setup_harvesting() requires " \
1221 "self.cmssw_version to be set!!!"
1223 harvesting_info = {}
1226 harvesting_info[
"DQMOffline"] = {}
1227 harvesting_info[
"DQMOffline"][
"beamspot"] =
None
1228 harvesting_info[
"DQMOffline"][
"eventcontent"] =
None
1229 harvesting_info[
"DQMOffline"][
"harvesting"] =
"AtRunEnd"
1231 harvesting_info[
"RelVal"] = {}
1232 harvesting_info[
"RelVal"][
"beamspot"] =
None
1233 harvesting_info[
"RelVal"][
"eventcontent"] =
None
1234 harvesting_info[
"RelVal"][
"harvesting"] =
"AtRunEnd"
1236 harvesting_info[
"RelValFS"] = {}
1237 harvesting_info[
"RelValFS"][
"beamspot"] =
None
1238 harvesting_info[
"RelValFS"][
"eventcontent"] =
None
1239 harvesting_info[
"RelValFS"][
"harvesting"] =
"AtRunEnd"
1241 harvesting_info[
"MC"] = {}
1242 harvesting_info[
"MC"][
"beamspot"] =
None
1243 harvesting_info[
"MC"][
"eventcontent"] =
None
1244 harvesting_info[
"MC"][
"harvesting"] =
"AtRunEnd"
1254 assert self.cmssw_version.startswith(
"CMSSW_")
1257 version = self.cmssw_version[6:]
1263 if version <
"3_3_0":
1264 step_string =
"validationHarvesting"
1265 elif version
in [
"3_3_0_pre1",
"3_3_0_pre2",
1266 "3_3_0_pre3",
"3_3_0_pre4",
1267 "3_3_0_pre6",
"3_4_0_pre1"]:
1268 step_string =
"validationHarvesting"
1270 step_string =
"validationHarvesting+dqmHarvesting"
1272 harvesting_info[
"RelVal"][
"step_string"] = step_string
1276 assert not step_string
is None, \
1277 "ERROR Could not decide a RelVal harvesting sequence " \
1278 "for CMSSW version %s" % self.cmssw_version
1284 step_string =
"validationHarvestingFS"
1286 harvesting_info[
"RelValFS"][
"step_string"] = step_string
1291 step_string =
"validationprodHarvesting"
1293 harvesting_info[
"MC"][
"step_string"] = step_string
1297 assert not step_string
is None, \
1298 "ERROR Could not decide a MC harvesting " \
1299 "sequence for CMSSW version %s" % self.cmssw_version
1305 step_string =
"dqmHarvesting"
1307 harvesting_info[
"DQMOffline"][
"step_string"] = step_string
1311 self.harvesting_info = harvesting_info
1313 self.logger.info(
"Based on the CMSSW version (%s) " \
1314 "I decided to use the `HARVESTING:%s' " \
1315 "sequence for %s harvesting" % \
1316 (self.cmssw_version,
1317 self.harvesting_info[self.harvesting_type][
"step_string"],
1318 self.harvesting_type))
1325 """Build the common part of the output path to be used on
1328 This consists of the CASTOR area base path specified by the
1329 user and a piece depending on the data type (data vs. MC), the
1330 harvesting type and the dataset name followed by a piece
1331 containing the run number and event count. (See comments in
1332 create_castor_path_name_special for details.) This method
1333 creates the common part, without run number and event count.
1337 castor_path = self.castor_base_dir
1342 datatype = self.datasets_information[dataset_name][
"datatype"]
1343 datatype = datatype.lower()
1344 castor_path = os.path.join(castor_path, datatype)
1347 harvesting_type = self.harvesting_type
1348 harvesting_type = harvesting_type.lower()
1349 castor_path = os.path.join(castor_path, harvesting_type)
1359 release_version = self.cmssw_version
1360 release_version = release_version.lower(). \
1363 castor_path = os.path.join(castor_path, release_version)
1366 dataset_name_escaped = self.escape_dataset_name(dataset_name)
1367 castor_path = os.path.join(castor_path, dataset_name_escaped)
1371 castor_path = os.path.normpath(castor_path)
1379 dataset_name, run_number,
1380 castor_path_common):
1381 """Create the specialised part of the CASTOR output dir name.
1383 NOTE: To avoid clashes with `incremental harvesting'
1384 (re-harvesting when a dataset grows) we have to include the
1385 event count in the path name. The underlying `problem' is that
1386 CRAB does not overwrite existing output files so if the output
1387 file already exists CRAB will fail to copy back the output.
1389 NOTE: It's not possible to create different kinds of
1390 harvesting jobs in a single call to this tool. However, in
1391 principle it could be possible to create both data and MC jobs
1394 NOTE: The number of events used in the path name is the
1395 _total_ number of events in the dataset/run at the time of
1396 harvesting. If we're doing partial harvesting the final
1397 results will reflect lower statistics. This is a) the easiest
1398 to code and b) the least likely to lead to confusion if
1399 someone ever decides to swap/copy around file blocks between
1404 castor_path = castor_path_common
1409 castor_path = os.path.join(castor_path,
"run_%d" % run_number)
1417 castor_path = os.path.join(castor_path,
"nevents")
1421 castor_path = os.path.normpath(castor_path)
1429 """Make sure all required CASTOR output dirs exist.
1431 This checks the CASTOR base dir specified by the user as well
1432 as all the subdirs required by the current set of jobs.
1436 self.logger.info(
"Checking (and if necessary creating) CASTOR " \
1437 "output area(s)...")
1440 self.create_and_check_castor_dir(self.castor_base_dir)
1444 for (dataset_name, runs)
in self.datasets_to_use.iteritems():
1447 castor_dirs.append(self.datasets_information[dataset_name] \
1448 [
"castor_path"][run])
1449 castor_dirs_unique =
list(set(castor_dirs))
1450 castor_dirs_unique.sort()
1454 ndirs = len(castor_dirs_unique)
1455 step =
max(ndirs / 10, 1)
1456 for (i, castor_dir)
in enumerate(castor_dirs_unique):
1457 if (i + 1) % step == 0
or \
1459 self.logger.info(
" %d/%d" % \
1461 self.create_and_check_castor_dir(castor_dir)
1468 self.logger.debug(
"Checking if path `%s' is empty" % \
1470 cmd =
"rfdir %s" % castor_dir
1471 (status, output) = commands.getstatusoutput(cmd)
1473 msg =
"Could not access directory `%s'" \
1474 " !!! This is bad since I should have just" \
1475 " created it !!!" % castor_dir
1476 self.logger.fatal(msg)
1479 self.logger.warning(
"Output directory `%s' is not empty:" \
1480 " new jobs will fail to" \
1481 " copy back output" % \
1489 """Check existence of the give CASTOR dir, if necessary create
1492 Some special care has to be taken with several things like
1493 setting the correct permissions such that CRAB can store the
1494 output results. Of course this means that things like
1495 /castor/cern.ch/ and user/j/ have to be recognised and treated
1498 NOTE: Only CERN CASTOR area (/castor/cern.ch/) supported for
1501 NOTE: This method uses some slightly tricky caching to make
1502 sure we don't keep over and over checking the same base paths.
1509 def split_completely(path):
1510 (parent_path, name) = os.path.split(path)
1512 return (parent_path, )
1514 return split_completely(parent_path) + (name, )
1520 def extract_permissions(rfstat_output):
1521 """Parse the output from rfstat and return the
1522 5-digit permissions string."""
1524 permissions_line = [i
for i
in output.split(
"\n") \
1525 if i.lower().
find(
"protection") > -1]
1526 regexp = re.compile(
".*\(([0123456789]{5})\).*")
1527 match = regexp.search(rfstat_output)
1528 if not match
or len(match.groups()) != 1:
1529 msg =
"Could not extract permissions " \
1530 "from output: %s" % rfstat_output
1531 self.logger.fatal(msg)
1533 permissions = match.group(1)
1550 castor_paths_dont_touch = {
1551 0: [
"/",
"castor",
"cern.ch",
"cms",
"store",
"temp",
1552 "dqm",
"offline",
"user"],
1553 -1: [
"user",
"store"]
1556 self.logger.debug(
"Checking CASTOR path `%s'" % castor_dir)
1561 castor_path_pieces = split_completely(castor_dir)
1567 check_sizes = castor_paths_dont_touch.keys()
1569 len_castor_path_pieces = len(castor_path_pieces)
1570 for piece_index
in xrange (len_castor_path_pieces):
1571 skip_this_path_piece =
False
1572 piece = castor_path_pieces[piece_index]
1575 for check_size
in check_sizes:
1577 if (piece_index + check_size) > -1:
1581 if castor_path_pieces[piece_index + check_size]
in castor_paths_dont_touch[check_size]:
1583 skip_this_path_piece =
True
1591 path = os.path.join(path, piece)
1598 if path
in self.castor_path_checks_cache:
1600 except AttributeError:
1602 self.castor_path_checks_cache = []
1603 self.castor_path_checks_cache.append(path)
1621 if not skip_this_path_piece:
1629 self.logger.debug(
"Checking if path `%s' exists" % \
1631 cmd =
"rfstat %s" % path
1632 (status, output) = commands.getstatusoutput(cmd)
1635 self.logger.debug(
"Creating path `%s'" % path)
1636 cmd =
"nsmkdir -m 775 %s" % path
1637 (status, output) = commands.getstatusoutput(cmd)
1639 msg =
"Could not create directory `%s'" % path
1640 self.logger.fatal(msg)
1642 cmd =
"rfstat %s" % path
1643 (status, output) = commands.getstatusoutput(cmd)
1648 permissions = extract_permissions(output)
1649 if not permissions.startswith(
"40"):
1650 msg =
"Path `%s' is not a directory(?)" % path
1651 self.logger.fatal(msg)
1656 self.logger.debug(
"Checking permissions for path `%s'" % path)
1657 cmd =
"rfstat %s" % path
1658 (status, output) = commands.getstatusoutput(cmd)
1660 msg =
"Could not obtain permissions for directory `%s'" % \
1662 self.logger.fatal(msg)
1665 permissions = extract_permissions(output)[-3:]
1669 if piece_index == (len_castor_path_pieces - 1):
1672 permissions_target =
"775"
1675 permissions_target =
"775"
1678 permissions_new = []
1679 for (i, j)
in zip(permissions, permissions_target):
1680 permissions_new.append(str(
max(int(i), int(j))))
1681 permissions_new =
"".
join(permissions_new)
1682 self.logger.debug(
" current permissions: %s" % \
1684 self.logger.debug(
" target permissions : %s" % \
1686 if permissions_new != permissions:
1688 self.logger.debug(
"Changing permissions of `%s' " \
1689 "to %s (were %s)" % \
1690 (path, permissions_new, permissions))
1691 cmd =
"rfchmod %s %s" % (permissions_new, path)
1692 (status, output) = commands.getstatusoutput(cmd)
1694 msg =
"Could not change permissions for path `%s' " \
1695 "to %s" % (path, permissions_new)
1696 self.logger.fatal(msg)
1699 self.logger.debug(
" Permissions ok (%s)" % permissions_new)
1708 sites_forbidden = []
1710 if (self.preferred_site ==
"CAF")
or (self.preferred_site ==
"caf.cern.ch"):
1711 self.caf_access =
True
1713 if self.caf_access ==
False:
1714 sites_forbidden.append(
"caf.cern.ch")
1725 "cmssrm-fzk.gridka.de",
1727 "gridka-dCache.fzk.de",
1728 "srm-cms.gridpp.rl.ac.uk",
1729 "srm.grid.sinica.edu.tw",
1730 "srm2.grid.sinica.edu.tw",
1732 "storm-fe-cms.cr.cnaf.infn.it"
1736 "CAF" :
"caf.cern.ch",
1737 "CH" :
"srm-cms.cern.ch",
1738 "FR" :
"ccsrm.in2p3.fr",
1739 "DE" :
"cmssrm-fzk.gridka.de",
1740 "GOV" :
"cmssrm.fnal.gov",
1741 "DE2" :
"gridka-dCache.fzk.de",
1742 "UK" :
"srm-cms.gridpp.rl.ac.uk",
1743 "TW" :
"srm.grid.sinica.edu.tw",
1744 "TW2" :
"srm2.grid.sinica.edu.tw",
1745 "ES" :
"srmcms.pic.es",
1746 "IT" :
"storm-fe-cms.cr.cnaf.infn.it"
1749 if self.non_t1access:
1750 sites_forbidden.extend(all_t1)
1752 for site
in sites_forbidden:
1756 if self.preferred_site
in country_codes:
1757 self.preferred_site = country_codes[self.preferred_site]
1759 if self.preferred_site !=
"no preference":
1760 if self.preferred_site
in sites:
1761 sites = [self.preferred_site]
1774 while len(sites) > 0
and \
1781 t1_sites.append(site)
1782 if site ==
"caf.cern.ch":
1783 t1_sites.append(site)
1790 if len(t1_sites) > 0:
1791 se_name = choice(t1_sites)
1794 se_name = choice(sites)
1798 if self.sites_and_versions_cache.has_key(se_name)
and \
1799 self.sites_and_versions_cache[se_name].has_key(cmssw_version):
1800 if self.sites_and_versions_cache[se_name][cmssw_version]:
1804 self.logger.debug(
" --> rejecting site `%s'" % se_name)
1805 sites.remove(se_name)
1808 self.logger.info(
"Checking if site `%s' " \
1809 "has CMSSW version `%s'" % \
1810 (se_name, cmssw_version))
1811 self.sites_and_versions_cache[se_name] = {}
1826 cmd =
"lcg-info --list-ce " \
1829 "CEStatus=Production," \
1831 (cmssw_version, se_name)
1832 (status, output) = commands.getstatusoutput(cmd)
1834 self.logger.error(
"Could not check site information " \
1835 "for site `%s'" % se_name)
1837 if (len(output) > 0)
or (se_name ==
"caf.cern.ch"):
1838 self.sites_and_versions_cache[se_name][cmssw_version] =
True
1842 self.sites_and_versions_cache[se_name][cmssw_version] =
False
1843 self.logger.debug(
" --> rejecting site `%s'" % se_name)
1844 sites.remove(se_name)
1846 if site_name
is self.no_matching_site_found_str:
1847 self.logger.error(
" --> no matching site found")
1848 self.logger.error(
" --> Your release or SCRAM " \
1849 "architecture may not be available" \
1850 "anywhere on the (LCG) grid.")
1852 self.logger.debug(
" (command used: `%s')" % cmd)
1854 self.logger.debug(
" --> selected site `%s'" % site_name)
1858 if site_name
is None:
1859 site_name = self.no_matching_site_found_str
1862 self.all_sites_found =
False
1874 parser = optparse.OptionParser(version=
"%s %s" % \
1875 (
"%prog", self.version),
1878 self.option_parser = parser
1881 parser.add_option(
"-d",
"--debug",
1882 help=
"Switch on debug mode",
1884 callback=self.option_handler_debug)
1887 parser.add_option(
"-q",
"--quiet",
1888 help=
"Be less verbose",
1890 callback=self.option_handler_quiet)
1894 parser.add_option(
"",
"--force",
1895 help=
"Force mode. Do not abort on sanity check "
1898 callback=self.option_handler_force)
1901 parser.add_option(
"",
"--harvesting_type",
1902 help=
"Harvesting type: %s" % \
1903 ", ".
join(self.harvesting_types),
1905 callback=self.option_handler_harvesting_type,
1907 metavar=
"HARVESTING_TYPE")
1910 parser.add_option(
"",
"--harvesting_mode",
1911 help=
"Harvesting mode: %s (default = %s)" % \
1912 (
", ".
join(self.harvesting_modes),
1913 self.harvesting_mode_default),
1915 callback=self.option_handler_harvesting_mode,
1917 metavar=
"HARVESTING_MODE")
1920 parser.add_option(
"",
"--globaltag",
1921 help=
"GlobalTag to use. Default is the ones " \
1922 "the dataset was created with for MC, for data" \
1923 "a GlobalTag has to be specified.",
1925 callback=self.option_handler_globaltag,
1927 metavar=
"GLOBALTAG")
1930 parser.add_option(
"",
"--no-ref-hists",
1931 help=
"Don't use any reference histograms",
1933 callback=self.option_handler_no_ref_hists)
1937 parser.add_option(
"",
"--frontier-connection",
1938 help=
"Use this Frontier connection to find " \
1939 "GlobalTags and LocalTags (for reference " \
1940 "histograms).\nPlease only use this for " \
1943 callback=self.option_handler_frontier_connection,
1949 parser.add_option(
"",
"--frontier-connection-for-globaltag",
1950 help=
"Use this Frontier connection to find " \
1951 "GlobalTags.\nPlease only use this for " \
1954 callback=self.option_handler_frontier_connection,
1960 parser.add_option(
"",
"--frontier-connection-for-refhists",
1961 help=
"Use this Frontier connection to find " \
1962 "LocalTags (for reference " \
1963 "histograms).\nPlease only use this for " \
1966 callback=self.option_handler_frontier_connection,
1972 parser.add_option(
"",
"--dataset",
1973 help=
"Name (or regexp) of dataset(s) to process",
1976 callback=self.option_handler_input_spec,
1983 parser.add_option(
"",
"--dataset-ignore",
1984 help=
"Name (or regexp) of dataset(s) to ignore",
1986 callback=self.option_handler_input_spec,
1988 metavar=
"DATASET-IGNORE")
1992 parser.add_option(
"",
"--runs",
1993 help=
"Run number(s) to process",
1995 callback=self.option_handler_input_spec,
2001 parser.add_option(
"",
"--runs-ignore",
2002 help=
"Run number(s) to ignore",
2004 callback=self.option_handler_input_spec,
2006 metavar=
"RUNS-IGNORE")
2010 parser.add_option(
"",
"--datasetfile",
2011 help=
"File containing list of dataset names " \
2012 "(or regexps) to process",
2015 callback=self.option_handler_input_spec,
2018 metavar=
"DATASETFILE")
2022 parser.add_option(
"",
"--datasetfile-ignore",
2023 help=
"File containing list of dataset names " \
2024 "(or regexps) to ignore",
2026 callback=self.option_handler_input_spec,
2028 metavar=
"DATASETFILE-IGNORE")
2032 parser.add_option(
"",
"--runslistfile",
2033 help=
"File containing list of run numbers " \
2036 callback=self.option_handler_input_spec,
2038 metavar=
"RUNSLISTFILE")
2042 parser.add_option(
"",
"--runslistfile-ignore",
2043 help=
"File containing list of run numbers " \
2046 callback=self.option_handler_input_spec,
2048 metavar=
"RUNSLISTFILE-IGNORE")
2052 parser.add_option(
"",
"--Jsonrunfile",
2053 help=
"Jsonfile containing dictionary of run/lumisections pairs. " \
2054 "All lumisections of runs contained in dictionary are processed.",
2056 callback=self.option_handler_input_Jsonrunfile,
2058 metavar=
"JSONRUNFILE")
2062 parser.add_option(
"",
"--Jsonfile",
2063 help=
"Jsonfile containing dictionary of run/lumisections pairs. " \
2064 "Only specified lumisections of runs contained in dictionary are processed.",
2066 callback=self.option_handler_input_Jsonfile,
2072 parser.add_option(
"",
"--todo-file",
2073 help=
"Todo file containing a list of runs to process.",
2075 callback=self.option_handler_input_todofile,
2077 metavar=
"TODO-FILE")
2081 parser.add_option(
"",
"--refhistmappingfile",
2082 help=
"File to be use for the reference " \
2083 "histogram mappings. Default: `%s'." % \
2084 self.ref_hist_mappings_file_name_default,
2086 callback=self.option_handler_ref_hist_mapping_file,
2088 metavar=
"REFHISTMAPPING-FILE")
2093 parser.add_option(
"",
"--castordir",
2094 help=
"Place on CASTOR to store results. " \
2095 "Default: `%s'." % \
2096 self.castor_base_dir_default,
2098 callback=self.option_handler_castor_dir,
2100 metavar=
"CASTORDIR")
2104 parser.add_option(
"",
"--no-t1access",
2105 help=
"Try to create jobs that will run " \
2106 "without special `t1access' role",
2108 callback=self.option_handler_no_t1access)
2111 parser.add_option(
"",
"--caf-access",
2112 help=
"Crab jobs may run " \
2115 callback=self.option_handler_caf_access)
2118 parser.add_option(
"",
"--saveByLumiSection",
2119 help=
"set saveByLumiSection=1 in harvesting cfg file",
2121 callback=self.option_handler_saveByLumiSection)
2124 parser.add_option(
"",
"--automatic-crab-submission",
2125 help=
"Crab jobs are created and " \
2126 "submitted automatically",
2128 callback=self.option_handler_crab_submission)
2132 parser.add_option(
"",
"--max-sites",
2133 help=
"Max. number of sites each job is submitted to",
2135 callback=self.option_handler_sites,
2139 parser.add_option(
"",
"--site",
2140 help=
"Crab jobs are submitted to specified site. T1 sites may be shortened by the following (country) codes: \
2141 srm-cms.cern.ch : CH \
2142 ccsrm.in2p3.fr : FR \
2143 cmssrm-fzk.gridka.de : DE \
2144 cmssrm.fnal.gov : GOV \
2145 gridka-dCache.fzk.de : DE2 \
2146 rm-cms.gridpp.rl.ac.uk : UK \
2147 srm.grid.sinica.edu.tw : TW \
2148 srm2.grid.sinica.edu.tw : TW2 \
2149 srmcms.pic.es : ES \
2150 storm-fe-cms.cr.cnaf.infn.it : IT",
2152 callback=self.option_handler_preferred_site,
2157 parser.add_option(
"-l",
"--list",
2158 help=
"List all harvesting types and their" \
2159 "corresponding sequence names",
2161 callback=self.option_handler_list_types)
2167 if len(self.cmd_line_opts) < 1:
2168 self.cmd_line_opts = [
"--help"]
2177 for i
in [
"-d",
"--debug",
2179 if i
in self.cmd_line_opts:
2180 self.cmd_line_opts.remove(i)
2181 self.cmd_line_opts.insert(0, i)
2184 parser.set_defaults()
2185 (self.options, self.args) = parser.parse_args(self.cmd_line_opts)
2192 """Check completeness and correctness of input information.
2194 Check that all required information has been specified and
2195 that, at least as far as can be easily checked, it makes
2198 NOTE: This is also where any default values are applied.
2202 self.logger.info(
"Checking completeness/correctness of input...")
2206 if len(self.args) > 0:
2207 msg =
"Sorry but I don't understand `%s'" % \
2208 (
" ".
join(self.args))
2209 self.logger.fatal(msg)
2215 if self.harvesting_mode ==
"two-step":
2216 msg =
"--------------------\n" \
2217 " Sorry, but for the moment (well, till it works)" \
2218 " the two-step mode has been disabled.\n" \
2219 "--------------------\n"
2220 self.logger.fatal(msg)
2225 if self.harvesting_type
is None:
2226 msg =
"Please specify a harvesting type"
2227 self.logger.fatal(msg)
2230 if self.harvesting_mode
is None:
2231 self.harvesting_mode = self.harvesting_mode_default
2232 msg =
"No harvesting mode specified --> using default `%s'" % \
2233 self.harvesting_mode
2234 self.logger.warning(msg)
2240 if self.input_method[
"datasets"][
"use"]
is None:
2241 msg =
"Please specify an input dataset name " \
2242 "or a list file name"
2243 self.logger.fatal(msg)
2248 assert not self.input_name[
"datasets"][
"use"]
is None
2255 if self.use_ref_hists:
2256 if self.ref_hist_mappings_file_name
is None:
2257 self.ref_hist_mappings_file_name = self.ref_hist_mappings_file_name_default
2258 msg =
"No reference histogram mapping file specified --> " \
2259 "using default `%s'" % \
2260 self.ref_hist_mappings_file_name
2261 self.logger.warning(msg)
2267 if self.castor_base_dir
is None:
2268 self.castor_base_dir = self.castor_base_dir_default
2269 msg =
"No CASTOR area specified -> using default `%s'" % \
2270 self.castor_base_dir
2271 self.logger.warning(msg)
2275 if not self.castor_base_dir.startswith(self.castor_prefix):
2276 msg =
"CASTOR area does not start with `%s'" % \
2278 self.logger.fatal(msg)
2279 if self.castor_base_dir.find(
"castor") > -1
and \
2280 not self.castor_base_dir.find(
"cern.ch") > -1:
2281 self.logger.fatal(
"Only CERN CASTOR is supported")
2292 if self.globaltag
is None:
2293 self.logger.warning(
"No GlobalTag specified. This means I cannot")
2294 self.logger.warning(
"run on data, only on MC.")
2295 self.logger.warning(
"I will skip all data datasets.")
2300 if not self.globaltag
is None:
2301 if not self.globaltag.endswith(
"::All"):
2302 self.logger.warning(
"Specified GlobalTag `%s' does " \
2303 "not end in `::All' --> " \
2304 "appending this missing piece" % \
2306 self.globaltag =
"%s::All" % self.globaltag
2311 for (key, value)
in self.frontier_connection_name.iteritems():
2312 frontier_type_str =
"unknown"
2313 if key ==
"globaltag":
2314 frontier_type_str =
"the GlobalTag"
2315 elif key ==
"refhists":
2316 frontier_type_str =
"the reference histograms"
2318 if self.frontier_connection_overridden[key] ==
True:
2322 self.logger.info(
"Using %sdefault Frontier " \
2323 "connection for %s: `%s'" % \
2324 (non_str, frontier_type_str, value))
2333 """Check if CMSSW is setup.
2340 cmssw_version = os.getenv(
"CMSSW_VERSION")
2341 if cmssw_version
is None:
2342 self.logger.fatal(
"It seems CMSSW is not setup...")
2343 self.logger.fatal(
"($CMSSW_VERSION is empty)")
2344 raise Error(
"ERROR: CMSSW needs to be setup first!")
2346 self.cmssw_version = cmssw_version
2347 self.logger.info(
"Found CMSSW version %s properly set up" % \
2356 """Check if DBS is setup.
2363 dbs_home = os.getenv(
"DBSCMD_HOME")
2364 if dbs_home
is None:
2365 self.logger.fatal(
"It seems DBS is not setup...")
2366 self.logger.fatal(
" $DBSCMD_HOME is empty")
2367 raise Error(
"ERROR: DBS needs to be setup first!")
2385 self.logger.debug(
"Found DBS properly set up")
2393 """Setup the Python side of DBS.
2395 For more information see the DBS Python API documentation:
2396 https://twiki.cern.ch/twiki/bin/view/CMS/DBSApiDocumentation
2402 args[
"url"]=
"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/" \
2403 "servlet/DBSServlet"
2407 except DBSAPI.dbsApiException.DbsApiException, ex:
2408 self.logger.fatal(
"Caught DBS API exception %s: %s " % \
2409 (ex.getClassName(), ex.getErrorMessage()))
2410 if ex.getErrorCode()
not in (
None,
""):
2411 logger.debug(
"DBS exception error code: ", ex.getErrorCode())
2419 """Use DBS to resolve a wildcarded dataset name.
2425 assert not self.dbs_api
is None
2430 if not (dataset_name.startswith(
"/")
and \
2431 dataset_name.endswith(
"RECO")):
2432 self.logger.warning(
"Dataset name `%s' does not sound " \
2433 "like a valid dataset name!" % \
2439 dbs_query =
"find dataset where dataset like %s " \
2440 "and dataset.status = VALID" % \
2443 api_result = api.executeQuery(dbs_query)
2444 except DBSAPI.dbsApiException.DbsApiException:
2445 msg =
"ERROR: Could not execute DBS query"
2446 self.logger.fatal(msg)
2451 parser = xml.sax.make_parser()
2452 parser.setContentHandler(handler)
2456 xml.sax.parseString(api_result, handler)
2457 except SAXParseException:
2458 msg =
"ERROR: Could not parse DBS server output"
2459 self.logger.fatal(msg)
2463 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!"
2467 datasets = handler.results.values()[0]
2475 """Ask DBS for the CMSSW version used to create this dataset.
2481 assert not self.dbs_api
is None
2485 dbs_query =
"find algo.version where dataset = %s " \
2486 "and dataset.status = VALID" % \
2489 api_result = api.executeQuery(dbs_query)
2490 except DBSAPI.dbsApiException.DbsApiException:
2491 msg =
"ERROR: Could not execute DBS query"
2492 self.logger.fatal(msg)
2496 parser = xml.sax.make_parser()
2497 parser.setContentHandler(handler)
2500 xml.sax.parseString(api_result, handler)
2501 except SAXParseException:
2502 msg =
"ERROR: Could not parse DBS server output"
2503 self.logger.fatal(msg)
2507 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!"
2510 cmssw_version = handler.results.values()[0]
2513 assert len(cmssw_version) == 1
2516 cmssw_version = cmssw_version[0]
2519 return cmssw_version
2569 """Ask DBS for the list of runs in a given dataset.
2571 # NOTE: This does not (yet?) skip/remove empty runs. There is
2572 # a bug in the DBS entry run.numevents (i.e. it always returns
2573 # zero) which should be fixed in the `next DBS release'.
2575 # https://savannah.cern.ch/bugs/?53452
2576 # https://savannah.cern.ch/bugs/?53711
2587 assert not self.dbs_api
is None
2591 dbs_query =
"find run where dataset = %s " \
2592 "and dataset.status = VALID" % \
2595 api_result = api.executeQuery(dbs_query)
2596 except DBSAPI.dbsApiException.DbsApiException:
2597 msg =
"ERROR: Could not execute DBS query"
2598 self.logger.fatal(msg)
2602 parser = xml.sax.make_parser()
2603 parser.setContentHandler(handler)
2606 xml.sax.parseString(api_result, handler)
2607 except SAXParseException:
2608 msg =
"ERROR: Could not parse DBS server output"
2609 self.logger.fatal(msg)
2613 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!"
2616 runs = handler.results.values()[0]
2618 runs = [int(i)
for i
in runs]
2627 """Ask DBS for the globaltag corresponding to a given dataset.
2630 # This does not seem to work for data datasets? E.g. for
2631 # /Cosmics/Commissioning08_CRAFT0831X_V1_311_ReReco_FromSuperPointing_v1/RAW-RECO
2632 # Probaly due to the fact that the GlobalTag changed during
2640 assert not self.dbs_api
is None
2644 dbs_query =
"find dataset.tag where dataset = %s " \
2645 "and dataset.status = VALID" % \
2648 api_result = api.executeQuery(dbs_query)
2649 except DBSAPI.dbsApiException.DbsApiException:
2650 msg =
"ERROR: Could not execute DBS query"
2651 self.logger.fatal(msg)
2655 parser = xml.sax.make_parser()
2656 parser.setContentHandler(parser)
2659 xml.sax.parseString(api_result, handler)
2660 except SAXParseException:
2661 msg =
"ERROR: Could not parse DBS server output"
2662 self.logger.fatal(msg)
2666 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!"
2669 globaltag = handler.results.values()[0]
2672 assert len(globaltag) == 1
2675 globaltag = globaltag[0]
2683 """Ask DBS for the the data type (data or mc) of a given
2690 assert not self.dbs_api
is None
2694 dbs_query =
"find datatype.type where dataset = %s " \
2695 "and dataset.status = VALID" % \
2698 api_result = api.executeQuery(dbs_query)
2699 except DBSAPI.dbsApiException.DbsApiException:
2700 msg =
"ERROR: Could not execute DBS query"
2701 self.logger.fatal(msg)
2705 parser = xml.sax.make_parser()
2706 parser.setContentHandler(handler)
2709 xml.sax.parseString(api_result, handler)
2710 except SAXParseException:
2711 msg =
"ERROR: Could not parse DBS server output"
2712 self.logger.fatal(msg)
2716 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!"
2719 datatype = handler.results.values()[0]
2722 assert len(datatype) == 1
2725 datatype = datatype[0]
2736 """Determine the number of events in a given dataset (and run).
2738 Ask DBS for the number of events in a dataset. If a run number
2739 is specified the number of events returned is that in that run
2740 of that dataset. If problems occur we throw an exception.
2743 # Since DBS does not return the number of events correctly,
2744 # neither for runs nor for whole datasets, we have to work
2745 # around that a bit...
2752 assert not self.dbs_api
is None
2756 dbs_query =
"find file.name, file.numevents where dataset = %s " \
2757 "and dataset.status = VALID" % \
2759 if not run_number
is None:
2760 dbs_query = dbq_query + (
" and run = %d" % run_number)
2762 api_result = api.executeQuery(dbs_query)
2763 except DBSAPI.dbsApiException.DbsApiException:
2764 msg =
"ERROR: Could not execute DBS query"
2765 self.logger.fatal(msg)
2769 parser = xml.sax.make_parser()
2770 parser.setContentHandler(handler)
2773 xml.sax.parseString(api_result, handler)
2774 except SAXParseException:
2775 msg =
"ERROR: Could not parse DBS server output"
2776 self.logger.fatal(msg)
2780 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!"
2783 num_events = sum(handler.results[
"file.numevents"])
3077 """Figure out the number of events in each run of this dataset.
3079 This is a more efficient way of doing this than calling
3080 dbs_resolve_number_of_events for each run.
3084 self.logger.debug(
"Checking spread of dataset `%s'" % dataset_name)
3088 assert not self.dbs_api
is None
3092 dbs_query =
"find run.number, site, file.name, file.numevents " \
3093 "where dataset = %s " \
3094 "and dataset.status = VALID" % \
3097 api_result = api.executeQuery(dbs_query)
3098 except DBSAPI.dbsApiException.DbsApiException:
3099 msg =
"ERROR: Could not execute DBS query"
3100 self.logger.fatal(msg)
3103 handler =
DBSXMLHandler([
"run.number",
"site",
"file.name",
"file.numevents"])
3104 parser = xml.sax.make_parser()
3105 parser.setContentHandler(handler)
3148 xml.sax.parseString(api_result, handler)
3149 except SAXParseException:
3150 msg =
"ERROR: Could not parse DBS server output"
3151 self.logger.fatal(msg)
3155 assert(handler.check_results_validity()),
"ERROR The DBSXMLHandler screwed something up!"
3162 for (index, site_name)
in enumerate(handler.results[
"site"]):
3172 if len(site_name) < 1:
3174 run_number = int(handler.results[
"run.number"][index])
3175 file_name = handler.results[
"file.name"][index]
3176 nevents = int(handler.results[
"file.numevents"][index])
3179 if not files_info.has_key(run_number):
3181 files_info[run_number] = {}
3182 files_info[run_number][file_name] = (nevents,
3184 elif not files_info[run_number].has_key(file_name):
3186 files_info[run_number][file_name] = (nevents,
3193 assert nevents == files_info[run_number][file_name][0]
3195 files_info[run_number][file_name][1].
append(site_name)
3200 for run_number
in files_info.keys():
3201 files_without_sites = [i
for (i, j)
in \
3202 files_info[run_number].items() \
3204 if len(files_without_sites) > 0:
3205 self.logger.warning(
"Removing %d file(s)" \
3206 " with empty site names" % \
3207 len(files_without_sites))
3208 for file_name
in files_without_sites:
3209 del files_info[run_number][file_name]
3215 num_events_catalog = {}
3216 for run_number
in files_info.keys():
3217 site_names =
list(set([j
for i
in files_info[run_number].
values()
for j
in i[1]]))
3223 if len(site_names) > 1:
3231 all_file_names = files_info[run_number].
keys()
3232 all_file_names = set(all_file_names)
3233 sites_with_complete_copies = []
3234 for site_name
in site_names:
3235 files_at_site = [i
for (i, (j, k)) \
3236 in files_info[run_number].items() \
3238 files_at_site = set(files_at_site)
3239 if files_at_site == all_file_names:
3240 sites_with_complete_copies.append(site_name)
3241 if len(sites_with_complete_copies) < 1:
3247 if len(sites_with_complete_copies) > 1:
3266 self.logger.debug(
" -> run appears to be `mirrored'")
3268 self.logger.debug(
" -> run appears to be spread-out")
3271 len(sites_with_complete_copies) != len(site_names):
3275 for (file_name, (i, sites))
in files_info[run_number].items():
3276 complete_sites = [site
for site
in sites \
3277 if site
in sites_with_complete_copies]
3278 files_info[run_number][file_name] = (i, complete_sites)
3279 site_names = sites_with_complete_copies
3281 self.logger.debug(
" for run #%d:" % run_number)
3282 num_events_catalog[run_number] = {}
3283 num_events_catalog[run_number][
"all_sites"] = sum([i[0]
for i
in files_info[run_number].
values()])
3284 if len(site_names) < 1:
3285 self.logger.debug(
" run is not available at any site")
3286 self.logger.debug(
" (but should contain %d events" % \
3287 num_events_catalog[run_number][
"all_sites"])
3289 self.logger.debug(
" at all sites combined there are %d events" % \
3290 num_events_catalog[run_number][
"all_sites"])
3291 for site_name
in site_names:
3292 num_events_catalog[run_number][site_name] = sum([i[0]
for i
in files_info[run_number].
values()
if site_name
in i[1]])
3293 self.logger.debug(
" at site `%s' there are %d events" % \
3294 (site_name, num_events_catalog[run_number][site_name]))
3295 num_events_catalog[run_number][
"mirrored"] = mirrored
3298 return num_events_catalog
3358 """Build a list of all datasets to be processed.
3367 if input_method
is None:
3369 elif input_method ==
"dataset":
3374 self.logger.info(
"Asking DBS for dataset names")
3375 dataset_names = self.dbs_resolve_dataset_name(input_name)
3376 elif input_method ==
"datasetfile":
3381 self.logger.info(
"Reading input from list file `%s'" % \
3384 listfile = open(
"/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/bin/%s" %input_name,
"r")
3385 print "open listfile"
3386 for dataset
in listfile:
3388 dataset_stripped = dataset.strip()
3389 if len(dataset_stripped) < 1:
3392 if dataset_stripped[0] !=
"#":
3393 dataset_names.extend(self. \
3397 msg =
"ERROR: Could not open input list file `%s'" % \
3399 self.logger.fatal(msg)
3404 assert False,
"Unknown input method `%s'" % input_method
3411 dataset_names =
list(set(dataset_names))
3414 dataset_names.sort()
3417 return dataset_names
3422 """Build a list of datasets to process.
3426 self.logger.info(
"Building list of datasets to consider...")
3428 input_method = self.input_method[
"datasets"][
"use"]
3429 input_name = self.input_name[
"datasets"][
"use"]
3430 dataset_names = self.build_dataset_list(input_method,
3433 [
None] * len(dataset_names)))
3435 self.logger.info(
" found %d dataset(s) to process:" % \
3437 for dataset
in dataset_names:
3438 self.logger.info(
" `%s'" % dataset)
3445 """Build a list of datasets to ignore.
3447 NOTE: We should always have a list of datasets to process, but
3448 it may be that we don't have a list of datasets to ignore.
3452 self.logger.info(
"Building list of datasets to ignore...")
3454 input_method = self.input_method[
"datasets"][
"ignore"]
3455 input_name = self.input_name[
"datasets"][
"ignore"]
3456 dataset_names = self.build_dataset_list(input_method,
3459 [
None] * len(dataset_names)))
3461 self.logger.info(
" found %d dataset(s) to ignore:" % \
3463 for dataset
in dataset_names:
3464 self.logger.info(
" `%s'" % dataset)
3476 if input_method
is None:
3478 elif input_method ==
"runs":
3481 self.logger.info(
"Reading list of runs from the " \
3483 runs.extend([int(i.strip()) \
3484 for i
in input_name.split(
",") \
3485 if len(i.strip()) > 0])
3486 elif input_method ==
"runslistfile":
3488 self.logger.info(
"Reading list of runs from file `%s'" % \
3491 listfile = open(input_name,
"r")
3492 for run
in listfile:
3494 run_stripped = run.strip()
3495 if len(run_stripped) < 1:
3498 if run_stripped[0] !=
"#":
3499 runs.append(int(run_stripped))
3502 msg =
"ERROR: Could not open input list file `%s'" % \
3504 self.logger.fatal(msg)
3510 assert False,
"Unknown input method `%s'" % input_method
3514 runs =
list(set(runs))
3522 """Build a list of runs to process.
3526 self.logger.info(
"Building list of runs to consider...")
3528 input_method = self.input_method[
"runs"][
"use"]
3529 input_name = self.input_name[
"runs"][
"use"]
3530 runs = self.build_runs_list(input_method, input_name)
3531 self.runs_to_use =
dict(
zip(runs, [
None] * len(runs)))
3533 self.logger.info(
" found %d run(s) to process:" % \
3536 self.logger.info(
" %s" %
", ".
join([str(i)
for i
in runs]))
3543 """Build a list of runs to ignore.
3545 NOTE: We should always have a list of runs to process, but
3546 it may be that we don't have a list of runs to ignore.
3550 self.logger.info(
"Building list of runs to ignore...")
3552 input_method = self.input_method[
"runs"][
"ignore"]
3553 input_name = self.input_name[
"runs"][
"ignore"]
3554 runs = self.build_runs_list(input_method, input_name)
3555 self.runs_to_ignore =
dict(
zip(runs, [
None] * len(runs)))
3557 self.logger.info(
" found %d run(s) to ignore:" % \
3560 self.logger.info(
" %s" %
", ".
join([str(i)
for i
in runs]))
3567 """Update the list of datasets taking into account the ones to
3570 Both lists have been generated before from DBS and both are
3571 assumed to be unique.
3573 NOTE: The advantage of creating the ignore list from DBS (in
3574 case a regexp is given) and matching that instead of directly
3575 matching the ignore criterion against the list of datasets (to
3576 consider) built from DBS is that in the former case we're sure
3577 that all regexps are treated exactly as DBS would have done
3578 without the cmsHarvester.
3580 NOTE: This only removes complete samples. Exclusion of single
3581 runs is done by the book keeping. So the assumption is that a
3582 user never wants to harvest just part (i.e. n out of N runs)
3587 self.logger.info(
"Processing list of datasets to ignore...")
3589 self.logger.debug(
"Before processing ignore list there are %d " \
3590 "datasets in the list to be processed" % \
3591 len(self.datasets_to_use))
3594 dataset_names_filtered = copy.deepcopy(self.datasets_to_use)
3595 for dataset_name
in self.datasets_to_use.keys():
3596 if dataset_name
in self.datasets_to_ignore.keys():
3597 del dataset_names_filtered[dataset_name]
3599 self.logger.info(
" --> Removed %d dataset(s)" % \
3600 (len(self.datasets_to_use) -
3601 len(dataset_names_filtered)))
3603 self.datasets_to_use = dataset_names_filtered
3605 self.logger.debug(
"After processing ignore list there are %d " \
3606 "datasets in the list to be processed" % \
3607 len(self.datasets_to_use))
3615 self.logger.info(
"Processing list of runs to use and ignore...")
3626 runs_to_use = self.runs_to_use
3627 runs_to_ignore = self.runs_to_ignore
3629 for dataset_name
in self.datasets_to_use:
3630 runs_in_dataset = self.datasets_information[dataset_name][
"runs"]
3633 runs_to_use_tmp = []
3634 for run
in runs_to_use:
3635 if not run
in runs_in_dataset:
3636 self.logger.warning(
"Dataset `%s' does not contain " \
3637 "requested run %d " \
3638 "--> ignoring `use' of this run" % \
3639 (dataset_name, run))
3641 runs_to_use_tmp.append(run)
3643 if len(runs_to_use) > 0:
3644 runs = runs_to_use_tmp
3645 self.logger.info(
"Using %d out of %d runs " \
3646 "of dataset `%s'" % \
3647 (len(runs), len(runs_in_dataset),
3650 runs = runs_in_dataset
3652 if len(runs_to_ignore) > 0:
3655 if not run
in runs_to_ignore:
3656 runs_tmp.append(run)
3657 self.logger.info(
"Ignoring %d out of %d runs " \
3658 "of dataset `%s'" % \
3659 (len(runs)- len(runs_tmp),
3660 len(runs_in_dataset),
3664 if self.todofile !=
"YourToDofile.txt":
3666 print "Reading runs from file /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s" %self.todofile
3667 cmd=
"grep %s /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s | cut -f5 -d' '" %(dataset_name,self.todofile)
3668 (status, output)=commands.getstatusoutput(cmd)
3671 if run_str
in output:
3672 runs_todo.append(run)
3673 self.logger.info(
"Using %d runs " \
3674 "of dataset `%s'" % \
3680 if self.Jsonfilename !=
"YourJSON.txt":
3682 self.Jsonlumi =
True
3685 self.logger.info(
"Reading runs and lumisections from file `%s'" % \
3688 Jsonfile = open(self.Jsonfilename,
"r")
3689 for names
in Jsonfile:
3690 dictNames= eval(str(names))
3691 for key
in dictNames:
3693 Json_runs.append(intkey)
3696 msg =
"ERROR: Could not open Jsonfile `%s'" % \
3698 self.logger.fatal(msg)
3701 if run
in Json_runs:
3702 good_runs.append(run)
3703 self.logger.info(
"Using %d runs " \
3704 "of dataset `%s'" % \
3708 if (self.Jsonrunfilename !=
"YourJSON.txt")
and (self.Jsonfilename ==
"YourJSON.txt"):
3712 self.logger.info(
"Reading runs from file `%s'" % \
3713 self.Jsonrunfilename)
3715 Jsonfile = open(self.Jsonrunfilename,
"r")
3716 for names
in Jsonfile:
3717 dictNames= eval(str(names))
3718 for key
in dictNames:
3720 Json_runs.append(intkey)
3723 msg =
"ERROR: Could not open Jsonfile `%s'" % \
3725 self.logger.fatal(msg)
3728 if run
in Json_runs:
3729 good_runs.append(run)
3730 self.logger.info(
"Using %d runs " \
3731 "of dataset `%s'" % \
3736 self.datasets_to_use[dataset_name] = runs
3743 """Remove all but the largest part of all datasets.
3745 This allows us to harvest at least part of these datasets
3746 using single-step harvesting until the two-step approach
3752 assert self.harvesting_mode ==
"single-step-allow-partial"
3755 for dataset_name
in self.datasets_to_use:
3756 for run_number
in self.datasets_information[dataset_name][
"runs"]:
3757 max_events =
max(self.datasets_information[dataset_name][
"sites"][run_number].
values())
3758 sites_with_max_events = [i[0]
for i
in self.datasets_information[dataset_name][
"sites"][run_number].items()
if i[1] == max_events]
3759 self.logger.warning(
"Singlifying dataset `%s', " \
3761 (dataset_name, run_number))
3762 cmssw_version = self.datasets_information[dataset_name] \
3764 selected_site = self.pick_a_site(sites_with_max_events,
3768 nevents_old = self.datasets_information[dataset_name][
"num_events"][run_number]
3769 self.logger.warning(
" --> " \
3770 "only harvesting partial statistics: " \
3771 "%d out of %d events (5.1%f%%) " \
3775 100. * max_events / nevents_old,
3777 self.logger.warning(
"!!! Please note that the number of " \
3778 "events in the output path name will " \
3779 "NOT reflect the actual statistics in " \
3780 "the harvested results !!!")
3787 self.datasets_information[dataset_name][
"sites"][run_number] = {selected_site: max_events}
3788 self.datasets_information[dataset_name][
"num_events"][run_number] = max_events
3796 """Check list of dataset names for impossible ones.
3798 Two kinds of checks are done:
3799 - Checks for things that do not make sense. These lead to
3800 errors and skipped datasets.
3801 - Sanity checks. For these warnings are issued but the user is
3802 considered to be the authoritative expert.
3805 - The CMSSW version encoded in the dataset name should match
3806 self.cmssw_version. This is critical.
3807 - There should be some events in the dataset/run. This is
3808 critical in the sense that CRAB refuses to create jobs for
3809 zero events. And yes, this does happen in practice. E.g. the
3810 reprocessed CRAFT08 datasets contain runs with zero events.
3811 - A cursory check is performed to see if the harvesting type
3812 makes sense for the data type. This should prevent the user
3813 from inadvertently running RelVal for data.
3814 - It is not possible to run single-step harvesting jobs on
3815 samples that are not fully contained at a single site.
3816 - Each dataset/run has to be available at at least one site.
3820 self.logger.info(
"Performing sanity checks on dataset list...")
3822 dataset_names_after_checks = copy.deepcopy(self.datasets_to_use)
3824 for dataset_name
in self.datasets_to_use.keys():
3827 version_from_dataset = self.datasets_information[dataset_name] \
3829 if version_from_dataset != self.cmssw_version:
3830 msg =
" CMSSW version mismatch for dataset `%s' " \
3833 self.cmssw_version, version_from_dataset)
3834 if self.force_running:
3836 self.logger.warning(
"%s " \
3837 "--> `force mode' active: " \
3840 del dataset_names_after_checks[dataset_name]
3841 self.logger.warning(
"%s " \
3842 "--> skipping" % msg)
3853 datatype = self.datasets_information[dataset_name][
"datatype"]
3854 if datatype ==
"data":
3856 if self.harvesting_type !=
"DQMOffline":
3858 elif datatype ==
"mc":
3859 if self.harvesting_type ==
"DQMOffline":
3863 assert False,
"ERROR Impossible data type `%s' " \
3864 "for dataset `%s'" % \
3865 (datatype, dataset_name)
3867 msg =
" Normally one does not run `%s' harvesting " \
3868 "on %s samples, are you sure?" % \
3869 (self.harvesting_type, datatype)
3870 if self.force_running:
3871 self.logger.warning(
"%s " \
3872 "--> `force mode' active: " \
3875 del dataset_names_after_checks[dataset_name]
3876 self.logger.warning(
"%s " \
3877 "--> skipping" % msg)
3891 if datatype ==
"data":
3892 if self.globaltag
is None:
3893 msg =
"For data datasets (like `%s') " \
3894 "we need a GlobalTag" % \
3896 del dataset_names_after_checks[dataset_name]
3897 self.logger.warning(
"%s " \
3898 "--> skipping" % msg)
3908 globaltag = self.datasets_information[dataset_name][
"globaltag"]
3909 if not globaltag
in self.globaltag_check_cache:
3910 if self.check_globaltag(globaltag):
3911 self.globaltag_check_cache.append(globaltag)
3913 msg =
"Something is wrong with GlobalTag `%s' " \
3914 "used by dataset `%s'!" % \
3915 (globaltag, dataset_name)
3916 if self.use_ref_hists:
3917 msg +=
"\n(Either it does not exist or it " \
3918 "does not contain the required key to " \
3919 "be used with reference histograms.)"
3921 msg +=
"\n(It probably just does not exist.)"
3922 self.logger.fatal(msg)
3928 runs_without_sites = [i
for (i, j)
in \
3929 self.datasets_information[dataset_name] \
3932 i
in self.datasets_to_use[dataset_name]]
3933 if len(runs_without_sites) > 0:
3934 for run_without_sites
in runs_without_sites:
3936 dataset_names_after_checks[dataset_name].
remove(run_without_sites)
3939 self.logger.warning(
" removed %d unavailable run(s) " \
3940 "from dataset `%s'" % \
3941 (len(runs_without_sites), dataset_name))
3942 self.logger.debug(
" (%s)" % \
3943 ", ".
join([str(i)
for i
in \
3944 runs_without_sites]))
3950 if not self.harvesting_mode ==
"two-step":
3951 for run_number
in self.datasets_to_use[dataset_name]:
3956 num_sites = len(self.datasets_information[dataset_name] \
3957 [
"sites"][run_number])
3958 if num_sites > 1
and \
3959 not self.datasets_information[dataset_name] \
3960 [
"mirrored"][run_number]:
3964 msg =
" Dataset `%s', run %d is spread across more " \
3965 "than one site.\n" \
3966 " Cannot run single-step harvesting on " \
3967 "samples spread across multiple sites" % \
3968 (dataset_name, run_number)
3970 dataset_names_after_checks[dataset_name].
remove(run_number)
3973 self.logger.warning(
"%s " \
3974 "--> skipping" % msg)
3983 tmp = [j
for (i, j)
in self.datasets_information \
3984 [dataset_name][
"num_events"].items() \
3985 if i
in self.datasets_to_use[dataset_name]]
3986 num_events_dataset = sum(tmp)
3988 if num_events_dataset < 1:
3989 msg =
" dataset `%s' is empty" % dataset_name
3990 del dataset_names_after_checks[dataset_name]
3991 self.logger.warning(
"%s " \
3992 "--> skipping" % msg)
4005 self.datasets_information[dataset_name] \
4006 [
"num_events"].items()
if i[1] < 1]
4007 tmp = [i
for i
in tmp
if i[0]
in self.datasets_to_use[dataset_name]]
4009 if len(empty_runs) > 0:
4010 for empty_run
in empty_runs:
4012 dataset_names_after_checks[dataset_name].
remove(empty_run)
4015 self.logger.info(
" removed %d empty run(s) from dataset `%s'" % \
4016 (len(empty_runs), dataset_name))
4017 self.logger.debug(
" (%s)" % \
4018 ", ".
join([str(i)
for i
in empty_runs]))
4024 dataset_names_after_checks_tmp = copy.deepcopy(dataset_names_after_checks)
4025 for (dataset_name, runs)
in dataset_names_after_checks.iteritems():
4027 self.logger.warning(
" Removing dataset without any runs " \
4030 del dataset_names_after_checks_tmp[dataset_name]
4031 dataset_names_after_checks = dataset_names_after_checks_tmp
4035 self.logger.warning(
" --> Removed %d dataset(s)" % \
4036 (len(self.datasets_to_use) -
4037 len(dataset_names_after_checks)))
4040 self.datasets_to_use = dataset_names_after_checks
4047 """Escape a DBS dataset name.
4049 Escape a DBS dataset name such that it does not cause trouble
4050 with the file system. This means turning each `/' into `__',
4051 except for the first one which is just removed.
4055 escaped_dataset_name = dataset_name
4056 escaped_dataset_name = escaped_dataset_name.strip(
"/")
4057 escaped_dataset_name = escaped_dataset_name.replace(
"/",
"__")
4059 return escaped_dataset_name
4066 """Generate the name of the configuration file to be run by
4069 Depending on the harvesting mode (single-step or two-step)
4070 this is the name of the real harvesting configuration or the
4071 name of the first-step ME summary extraction configuration.
4075 if self.harvesting_mode ==
"single-step":
4076 config_file_name = self.create_harvesting_config_file_name(dataset_name)
4077 elif self.harvesting_mode ==
"single-step-allow-partial":
4078 config_file_name = self.create_harvesting_config_file_name(dataset_name)
4085 elif self.harvesting_mode ==
"two-step":
4086 config_file_name = self.create_me_summary_config_file_name(dataset_name)
4088 assert False,
"ERROR Unknown harvesting mode `%s'" % \
4089 self.harvesting_mode
4092 return config_file_name
4098 "Generate the name to be used for the harvesting config file."
4100 file_name_base =
"harvesting.py"
4101 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4102 config_file_name = file_name_base.replace(
".py",
4104 dataset_name_escaped)
4107 return config_file_name
4112 "Generate the name of the ME summary extraction config file."
4114 file_name_base =
"me_extraction.py"
4115 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4116 config_file_name = file_name_base.replace(
".py",
4118 dataset_name_escaped)
4121 return config_file_name
4126 """Create the name of the output file name to be used.
4128 This is the name of the output file of the `first step'. In
4129 the case of single-step harvesting this is already the final
4130 harvesting output ROOT file. In the case of two-step
4131 harvesting it is the name of the intermediary ME summary
4144 if self.harvesting_mode ==
"single-step":
4146 assert not run_number
is None
4148 output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4149 elif self.harvesting_mode ==
"single-step-allow-partial":
4151 assert not run_number
is None
4153 output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4154 elif self.harvesting_mode ==
"two-step":
4156 assert run_number
is None
4158 output_file_name = self.create_me_summary_output_file_name(dataset_name)
4161 assert False,
"ERROR Unknown harvesting mode `%s'" % \
4162 self.harvesting_mode
4165 return output_file_name
4170 """Generate the name to be used for the harvesting output file.
4172 This harvesting output file is the _final_ ROOT output file
4173 containing the harvesting results. In case of two-step
4174 harvesting there is an intermediate ME output file as well.
4178 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4186 output_file_name =
"DQM_V0001_R%09d__%s.root" % \
4187 (run_number, dataset_name_escaped)
4188 if self.harvesting_mode.find(
"partial") > -1:
4191 if self.datasets_information[dataset_name] \
4192 [
"mirrored"][run_number] ==
False:
4193 output_file_name = output_file_name.replace(
".root", \
4197 return output_file_name
4202 """Generate the name of the intermediate ME file name to be
4203 used in two-step harvesting.
4207 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4208 output_file_name =
"me_summary_%s.root" % \
4209 dataset_name_escaped
4212 return output_file_name
4217 """Create the block name to use for this dataset/run number.
4219 This is what appears in the brackets `[]' in multicrab.cfg. It
4220 is used as the name of the job and to create output
4225 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4226 block_name =
"%s_%09d_%s" % (dataset_name_escaped, run_number, index)
4234 """Create a CRAB configuration for a given job.
4236 NOTE: This is _not_ a complete (as in: submittable) CRAB
4237 configuration. It is used to store the common settings for the
4238 multicrab configuration.
4240 NOTE: Only CERN CASTOR area (/castor/cern.ch/) is supported.
4242 NOTE: According to CRAB, you `Must define exactly two of
4243 total_number_of_events, events_per_job, or
4244 number_of_jobs.'. For single-step harvesting we force one job,
4245 for the rest we don't really care.
4248 # With the current version of CRAB (2.6.1), in which Daniele
4249 # fixed the behaviour of no_block_boundary for me, one _has to
4250 # specify_ the total_number_of_events and one single site in
4251 # the se_white_list.
4259 castor_prefix = self.castor_prefix
4261 tmp.append(self.config_file_header())
4266 tmp.append(
"[CRAB]")
4267 tmp.append(
"jobtype = cmssw")
4272 tmp.append(
"[GRID]")
4273 tmp.append(
"virtual_organization=cms")
4278 tmp.append(
"[USER]")
4279 tmp.append(
"copy_data = 1")
4284 tmp.append(
"[CMSSW]")
4285 tmp.append(
"# This reveals data hosted on T1 sites,")
4286 tmp.append(
"# which is normally hidden by CRAB.")
4287 tmp.append(
"show_prod = 1")
4288 tmp.append(
"number_of_jobs = 1")
4289 if self.Jsonlumi ==
True:
4290 tmp.append(
"lumi_mask = %s" % self.Jsonfilename)
4291 tmp.append(
"total_number_of_lumis = -1")
4293 if self.harvesting_type ==
"DQMOffline":
4294 tmp.append(
"total_number_of_lumis = -1")
4296 tmp.append(
"total_number_of_events = -1")
4297 if self.harvesting_mode.find(
"single-step") > -1:
4298 tmp.append(
"# Force everything to run in one job.")
4299 tmp.append(
"no_block_boundary = 1")
4314 """Create a multicrab.cfg file for all samples.
4316 This creates the contents for a multicrab.cfg file that uses
4317 the crab.cfg file (generated elsewhere) for the basic settings
4318 and contains blocks for each run of each dataset.
4321 # The fact that it's necessary to specify the se_white_list
4322 # and the total_number_of_events is due to our use of CRAB
4323 # version 2.6.1. This should no longer be necessary in the
4329 cmd=
"who i am | cut -f1 -d' '"
4330 (status, output)=commands.getstatusoutput(cmd)
4333 if self.caf_access ==
True:
4334 print "Extracting %s as user name" %UserName
4336 number_max_sites = self.nr_max_sites + 1
4338 multicrab_config_lines = []
4339 multicrab_config_lines.append(self.config_file_header())
4340 multicrab_config_lines.append(
"")
4341 multicrab_config_lines.append(
"[MULTICRAB]")
4342 multicrab_config_lines.append(
"cfg = crab.cfg")
4343 multicrab_config_lines.append(
"")
4345 dataset_names = self.datasets_to_use.keys()
4346 dataset_names.sort()
4348 for dataset_name
in dataset_names:
4349 runs = self.datasets_to_use[dataset_name]
4350 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4351 castor_prefix = self.castor_prefix
4356 castor_dir = self.datasets_information[dataset_name] \
4357 [
"castor_path"][run]
4359 cmd =
"rfdir %s" % castor_dir
4360 (status, output) = commands.getstatusoutput(cmd)
4362 if len(output) <= 0:
4368 assert (len(self.datasets_information[dataset_name] \
4369 [
"sites"][run]) == 1)
or \
4370 self.datasets_information[dataset_name][
"mirrored"]
4373 site_names = self.datasets_information[dataset_name] \
4374 [
"sites"][run].
keys()
4376 for i
in range(1, number_max_sites, 1):
4377 if len(site_names) > 0:
4378 index =
"site_%02d" % (i)
4380 config_file_name = self. \
4382 output_file_name = self. \
4393 if len(site_names) > 1:
4394 cmssw_version = self.datasets_information[dataset_name] \
4396 self.logger.info(
"Picking site for mirrored dataset " \
4398 (dataset_name, run))
4399 site_name = self.pick_a_site(site_names, cmssw_version)
4400 if site_name
in site_names:
4401 site_names.remove(site_name)
4404 site_name = site_names[0]
4405 site_names.remove(site_name)
4407 if site_name
is self.no_matching_site_found_str:
4411 nevents = self.datasets_information[dataset_name][
"num_events"][run]
4414 multicrab_block_name = self.create_multicrab_block_name( \
4415 dataset_name, run, index)
4416 multicrab_config_lines.append(
"[%s]" % \
4417 multicrab_block_name)
4421 if site_name ==
"caf.cern.ch":
4422 multicrab_config_lines.append(
"CRAB.use_server=0")
4423 multicrab_config_lines.append(
"CRAB.scheduler=caf")
4425 multicrab_config_lines.append(
"scheduler = glite")
4429 if site_name ==
"caf.cern.ch":
4432 multicrab_config_lines.append(
"GRID.se_white_list = %s" % \
4434 multicrab_config_lines.append(
"# This removes the default blacklisting of T1 sites.")
4435 multicrab_config_lines.append(
"GRID.remove_default_blacklist = 1")
4436 multicrab_config_lines.append(
"GRID.rb = CERN")
4437 if not self.non_t1access:
4438 multicrab_config_lines.append(
"GRID.role = t1access")
4443 castor_dir = castor_dir.replace(castor_prefix,
"")
4444 multicrab_config_lines.append(
"USER.storage_element=srm-cms.cern.ch")
4445 multicrab_config_lines.append(
"USER.user_remote_dir = %s" % \
4447 multicrab_config_lines.append(
"USER.check_user_remote_dir=0")
4449 if site_name ==
"caf.cern.ch":
4450 multicrab_config_lines.append(
"USER.storage_path=%s" % castor_prefix)
4456 multicrab_config_lines.append(
"USER.storage_path=/srm/managerv2?SFN=%s" % castor_prefix)
4463 multicrab_config_lines.append(
"CMSSW.pset = %s" % \
4465 multicrab_config_lines.append(
"CMSSW.datasetpath = %s" % \
4467 multicrab_config_lines.append(
"CMSSW.runselection = %d" % \
4470 if self.Jsonlumi ==
True:
4473 if self.harvesting_type ==
"DQMOffline":
4476 multicrab_config_lines.append(
"CMSSW.total_number_of_events = %d" % \
4479 multicrab_config_lines.append(
"CMSSW.output_file = %s" % \
4484 if site_name ==
"caf.cern.ch":
4485 multicrab_config_lines.append(
"CAF.queue=cmscaf1nd")
4489 multicrab_config_lines.append(
"")
4493 self.all_sites_found =
True
4495 multicrab_config =
"\n".
join(multicrab_config_lines)
4498 return multicrab_config
4503 """Check if globaltag exists.
4505 Check if globaltag exists as GlobalTag in the database given
4506 by self.frontier_connection_name['globaltag']. If globaltag is
4507 None, self.globaltag is used instead.
4509 If we're going to use reference histograms this method also
4510 checks for the existence of the required key in the GlobalTag.
4514 if globaltag
is None:
4515 globaltag = self.globaltag
4518 if globaltag.endswith(
"::All"):
4519 globaltag = globaltag[:-5]
4521 connect_name = self.frontier_connection_name[
"globaltag"]
4529 connect_name = connect_name.replace(
"frontier://",
4530 "frontier://cmsfrontier:8000/")
4532 connect_name += self.db_account_name_cms_cond_globaltag()
4534 tag_exists = self.check_globaltag_exists(globaltag, connect_name)
4538 tag_contains_ref_hist_key =
False
4539 if self.use_ref_hists
and tag_exists:
4541 tag_contains_ref_hist_key = self.check_globaltag_contains_ref_hist_key(globaltag, connect_name)
4545 if self.use_ref_hists:
4546 ret_val = tag_exists
and tag_contains_ref_hist_key
4548 ret_val = tag_exists
4558 """Check if globaltag exists.
4562 self.logger.info(
"Checking existence of GlobalTag `%s'" % \
4564 self.logger.debug(
" (Using database connection `%s')" % \
4567 cmd =
"cmscond_tagtree_list -c %s -T %s" % \
4568 (connect_name, globaltag)
4569 (status, output) = commands.getstatusoutput(cmd)
4571 output.find(
"error") > -1:
4572 msg =
"Could not check existence of GlobalTag `%s' in `%s'" % \
4573 (globaltag, connect_name)
4574 if output.find(
".ALL_TABLES not found") > -1:
4576 "Missing database account `%s'" % \
4577 (msg, output.split(
".ALL_TABLES")[0].
split()[-1])
4578 self.logger.fatal(msg)
4579 self.logger.debug(
"Command used:")
4580 self.logger.debug(
" %s" % cmd)
4581 self.logger.debug(
"Output received:")
4582 self.logger.debug(output)
4584 if output.find(
"does not exist") > -1:
4585 self.logger.debug(
"GlobalTag `%s' does not exist in `%s':" % \
4586 (globaltag, connect_name))
4587 self.logger.debug(
"Output received:")
4588 self.logger.debug(output)
4592 self.logger.info(
" GlobalTag exists? -> %s" % tag_exists)
4600 """Check if globaltag contains the required RefHistos key.
4605 tag_contains_key =
None
4606 ref_hist_key =
"RefHistos"
4607 self.logger.info(
"Checking existence of reference " \
4608 "histogram key `%s' in GlobalTag `%s'" % \
4609 (ref_hist_key, globaltag))
4610 self.logger.debug(
" (Using database connection `%s')" % \
4612 cmd =
"cmscond_tagtree_list -c %s -T %s -n %s" % \
4613 (connect_name, globaltag, ref_hist_key)
4614 (status, output) = commands.getstatusoutput(cmd)
4616 output.find(
"error") > -1:
4617 msg =
"Could not check existence of key `%s'" % \
4618 (ref_hist_key, connect_name)
4619 self.logger.fatal(msg)
4620 self.logger.debug(
"Command used:")
4621 self.logger.debug(
" %s" % cmd)
4622 self.logger.debug(
"Output received:")
4623 self.logger.debug(
" %s" % output)
4626 self.logger.debug(
"Required key for use of reference " \
4627 "histograms `%s' does not exist " \
4628 "in GlobalTag `%s':" % \
4629 (ref_hist_key, globaltag))
4630 self.logger.debug(
"Output received:")
4631 self.logger.debug(output)
4632 tag_contains_key =
False
4634 tag_contains_key =
True
4636 self.logger.info(
" GlobalTag contains `%s' key? -> %s" % \
4637 (ref_hist_key, tag_contains_key))
4640 return tag_contains_key
4645 """Check the existence of tag_name in database connect_name.
4647 Check if tag_name exists as a reference histogram tag in the
4648 database given by self.frontier_connection_name['refhists'].
4652 connect_name = self.frontier_connection_name[
"refhists"]
4653 connect_name += self.db_account_name_cms_cond_dqm_summary()
4655 self.logger.debug(
"Checking existence of reference " \
4656 "histogram tag `%s'" % \
4658 self.logger.debug(
" (Using database connection `%s')" % \
4661 cmd =
"cmscond_list_iov -c %s" % \
4663 (status, output) = commands.getstatusoutput(cmd)
4665 msg =
"Could not check existence of tag `%s' in `%s'" % \
4666 (tag_name, connect_name)
4667 self.logger.fatal(msg)
4668 self.logger.debug(
"Command used:")
4669 self.logger.debug(
" %s" % cmd)
4670 self.logger.debug(
"Output received:")
4671 self.logger.debug(output)
4673 if not tag_name
in output.split():
4674 self.logger.debug(
"Reference histogram tag `%s' " \
4675 "does not exist in `%s'" % \
4676 (tag_name, connect_name))
4677 self.logger.debug(
" Existing tags: `%s'" % \
4678 "', `".
join(output.split()))
4682 self.logger.debug(
" Reference histogram tag exists? " \
4683 "-> %s" % tag_exists)
4691 """Build the es_prefer snippet for the reference histograms.
4693 The building of the snippet is wrapped in some care-taking
4694 code that figures out the name of the reference histogram set
4695 and makes sure the corresponding tag exists.
4701 ref_hist_tag_name = self.ref_hist_mappings[dataset_name]
4703 connect_name = self.frontier_connection_name[
"refhists"]
4704 connect_name += self.db_account_name_cms_cond_dqm_summary()
4705 record_name =
"DQMReferenceHistogramRootFileRcd"
4709 code_lines.append(
"from CondCore.DBCommon.CondDBSetup_cfi import *")
4710 code_lines.append(
"process.ref_hist_source = cms.ESSource(\"PoolDBESSource\", CondDBSetup,")
4711 code_lines.append(
" connect = cms.string(\"%s\")," % connect_name)
4712 code_lines.append(
" toGet = cms.VPSet(cms.PSet(record = cms.string(\"%s\")," % record_name)
4713 code_lines.append(
" tag = cms.string(\"%s\"))," % ref_hist_tag_name)
4714 code_lines.append(
" )")
4715 code_lines.append(
" )")
4716 code_lines.append(
"process.es_prefer_ref_hist_source = cms.ESPrefer(\"PoolDBESSource\", \"ref_hist_source\")")
4718 snippet =
"\n".
join(code_lines)
4726 """Create the Python harvesting configuration for harvesting.
4728 The basic configuration is created by
4729 Configuration.PyReleaseValidation.ConfigBuilder. (This mimics
4730 what cmsDriver.py does.) After that we add some specials
4733 NOTE: On one hand it may not be nice to circumvent
4734 cmsDriver.py, on the other hand cmsDriver.py does not really
4735 do anything itself. All the real work is done by the
4736 ConfigBuilder so there is not much risk that we miss out on
4737 essential developments of cmsDriver in the future.
4742 config_options = defaultOptions
4747 config_options.name =
"harvesting"
4748 config_options.scenario =
"pp"
4749 config_options.number = 1
4750 config_options.arguments = self.ident_string()
4751 config_options.evt_type = config_options.name
4752 config_options.customisation_file =
None
4753 config_options.filein =
"dummy_value"
4754 config_options.filetype =
"EDM"
4756 config_options.gflash =
"dummy_value"
4760 config_options.dbsquery =
""
4767 config_options.step =
"HARVESTING:%s" % \
4768 self.harvesting_info[self.harvesting_type] \
4770 config_options.beamspot = self.harvesting_info[self.harvesting_type] \
4772 config_options.eventcontent = self.harvesting_info \
4773 [self.harvesting_type] \
4775 config_options.harvesting = self.harvesting_info \
4776 [self.harvesting_type] \
4783 datatype = self.datasets_information[dataset_name][
"datatype"]
4784 config_options.isMC = (datatype.lower() ==
"mc")
4785 config_options.isData = (datatype.lower() ==
"data")
4786 globaltag = self.datasets_information[dataset_name][
"globaltag"]
4788 config_options.conditions = self.format_conditions_string(globaltag)
4792 if "with_input" in getargspec(ConfigBuilder.__init__)[0]:
4798 config_builder.prepare(
True)
4799 config_contents = config_builder.pythonCfgCode
4808 marker_lines.append(sep)
4809 marker_lines.append(
"# Code between these markers was generated by")
4810 marker_lines.append(
"# Configuration.PyReleaseValidation." \
4813 marker_lines.append(sep)
4816 tmp = [self.config_file_header()]
4820 tmp.append(config_contents)
4824 config_contents =
"\n".
join(tmp)
4829 customisations = [
""]
4831 customisations.append(
"# Now follow some customisations")
4832 customisations.append(
"")
4833 connect_name = self.frontier_connection_name[
"globaltag"]
4834 connect_name += self.db_account_name_cms_cond_globaltag()
4835 customisations.append(
"process.GlobalTag.connect = \"%s\"" % \
4839 if self.saveByLumiSection ==
True:
4840 customisations.append(
"process.dqmSaver.saveByLumiSection = 1")
4844 customisations.append(
"")
4858 use_es_prefer = (self.harvesting_type ==
"RelVal")
4859 use_refs = use_es_prefer
or \
4860 (
not self.harvesting_type ==
"MC")
4862 use_refs = use_refs
and self.use_ref_hists
4870 customisations.append(
"print \"Not using reference histograms\"")
4871 customisations.append(
"if hasattr(process, \"dqmRefHistoRootFileGetter\"):")
4872 customisations.append(
" for (sequence_name, sequence) in process.sequences.iteritems():")
4873 customisations.append(
" if sequence.remove(process.dqmRefHistoRootFileGetter):")
4874 customisations.append(
" print \"Removed process.dqmRefHistoRootFileGetter from sequence `%s'\" % \\")
4875 customisations.append(
" sequence_name")
4876 customisations.append(
"process.dqmSaver.referenceHandling = \"skip\"")
4880 customisations.append(
"process.dqmSaver.referenceHandling = \"all\"")
4882 es_prefer_snippet = self.create_es_prefer_snippet(dataset_name)
4883 customisations.append(es_prefer_snippet)
4887 workflow_name = dataset_name
4888 if self.harvesting_mode ==
"single-step-allow-partial":
4889 workflow_name +=
"_partial"
4890 customisations.append(
"process.dqmSaver.workflow = \"%s\"" % \
4927 config_contents = config_contents +
"\n".
join(customisations)
4932 return config_contents
4959 tmp.append(self.config_file_header())
4961 tmp.append(
"import FWCore.ParameterSet.Config as cms")
4963 tmp.append(
"process = cms.Process(\"ME2EDM\")")
4965 tmp.append(
"# Import of standard configurations")
4966 tmp.append(
"process.load(\"Configuration/EventContent/EventContent_cff\")")
4968 tmp.append(
"# We don't really process any events, just keep this set to one to")
4969 tmp.append(
"# make sure things work.")
4970 tmp.append(
"process.maxEvents = cms.untracked.PSet(")
4971 tmp.append(
" input = cms.untracked.int32(1)")
4974 tmp.append(
"process.options = cms.untracked.PSet(")
4975 tmp.append(
" Rethrow = cms.untracked.vstring(\"ProductNotFound\")")
4978 tmp.append(
"process.source = cms.Source(\"PoolSource\",")
4979 tmp.append(
" processingMode = \\")
4980 tmp.append(
" cms.untracked.string(\"RunsAndLumis\"),")
4981 tmp.append(
" fileNames = \\")
4982 tmp.append(
" cms.untracked.vstring(\"no_file_specified\")")
4985 tmp.append(
"# Output definition: drop everything except for the monitoring.")
4986 tmp.append(
"process.output = cms.OutputModule(")
4987 tmp.append(
" \"PoolOutputModule\",")
4988 tmp.append(
" outputCommands = \\")
4989 tmp.append(
" cms.untracked.vstring(\"drop *\", \\")
4990 tmp.append(
" \"keep *_MEtoEDMConverter_*_*\"),")
4991 output_file_name = self. \
4993 tmp.append(
" fileName = \\")
4994 tmp.append(
" cms.untracked.string(\"%s\")," % output_file_name)
4995 tmp.append(
" dataset = cms.untracked.PSet(")
4996 tmp.append(
" dataTier = cms.untracked.string(\"RECO\"),")
4997 tmp.append(
" filterName = cms.untracked.string(\"\")")
5001 tmp.append(
"# Additional output definition")
5002 tmp.append(
"process.out_step = cms.EndPath(process.output)")
5004 tmp.append(
"# Schedule definition")
5005 tmp.append(
"process.schedule = cms.Schedule(process.out_step)")
5008 config_contents =
"\n".
join(tmp)
5011 return config_contents
5049 """Write a CRAB job configuration Python file.
5053 self.logger.info(
"Writing CRAB configuration...")
5055 file_name_base =
"crab.cfg"
5058 crab_contents = self.create_crab_config()
5061 crab_file_name = file_name_base
5063 crab_file =
file(crab_file_name,
"w")
5064 crab_file.write(crab_contents)
5067 self.logger.fatal(
"Could not write " \
5068 "CRAB configuration to file `%s'" % \
5070 raise Error(
"ERROR: Could not write to file `%s'!" % \
5078 """Write a multi-CRAB job configuration Python file.
5082 self.logger.info(
"Writing multi-CRAB configuration...")
5084 file_name_base =
"multicrab.cfg"
5087 multicrab_contents = self.create_multicrab_config()
5090 multicrab_file_name = file_name_base
5092 multicrab_file =
file(multicrab_file_name,
"w")
5093 multicrab_file.write(multicrab_contents)
5094 multicrab_file.close()
5096 self.logger.fatal(
"Could not write " \
5097 "multi-CRAB configuration to file `%s'" % \
5098 multicrab_file_name)
5099 raise Error(
"ERROR: Could not write to file `%s'!" % \
5100 multicrab_file_name)
5107 """Write a harvesting job configuration Python file.
5109 NOTE: This knows nothing about single-step or two-step
5110 harvesting. That's all taken care of by
5111 create_harvesting_config.
5115 self.logger.debug(
"Writing harvesting configuration for `%s'..." % \
5119 config_contents = self.create_harvesting_config(dataset_name)
5122 config_file_name = self. \
5125 config_file =
file(config_file_name,
"w")
5126 config_file.write(config_contents)
5129 self.logger.fatal(
"Could not write " \
5130 "harvesting configuration to file `%s'" % \
5132 raise Error(
"ERROR: Could not write to file `%s'!" % \
5140 """Write an ME-extraction configuration Python file.
5142 This `ME-extraction' (ME = Monitoring Element) is the first
5143 step of the two-step harvesting.
5147 self.logger.debug(
"Writing ME-extraction configuration for `%s'..." % \
5151 config_contents = self.create_me_extraction_config(dataset_name)
5154 config_file_name = self. \
5157 config_file =
file(config_file_name,
"w")
5158 config_file.write(config_contents)
5161 self.logger.fatal(
"Could not write " \
5162 "ME-extraction configuration to file `%s'" % \
5164 raise Error(
"ERROR: Could not write to file `%s'!" % \
5173 """Check if we need to load and check the reference mappings.
5175 For data the reference histograms should be taken
5176 automatically from the GlobalTag, so we don't need any
5177 mappings. For RelVals we need to know a mapping to be used in
5178 the es_prefer code snippet (different references for each of
5181 WARNING: This implementation is a bit convoluted.
5187 if not dataset_name
is None:
5188 data_type = self.datasets_information[dataset_name] \
5190 mappings_needed = (data_type ==
"mc")
5192 if not mappings_needed:
5193 assert data_type ==
"data"
5196 tmp = [self.ref_hist_mappings_needed(dataset_name) \
5197 for dataset_name
in \
5198 self.datasets_information.keys()]
5199 mappings_needed = (
True in tmp)
5202 return mappings_needed
5207 """Load the reference histogram mappings from file.
5209 The dataset name to reference histogram name mappings are read
5210 from a text file specified in self.ref_hist_mappings_file_name.
5215 assert len(self.ref_hist_mappings) < 1, \
5216 "ERROR Should not be RE-loading " \
5217 "reference histogram mappings!"
5220 self.logger.info(
"Loading reference histogram mappings " \
5221 "from file `%s'" % \
5222 self.ref_hist_mappings_file_name)
5224 mappings_lines =
None
5226 mappings_file =
file(self.ref_hist_mappings_file_name,
"r")
5227 mappings_lines = mappings_file.readlines()
5228 mappings_file.close()
5230 msg =
"ERROR: Could not open reference histogram mapping "\
5231 "file `%s'" % self.ref_hist_mappings_file_name
5232 self.logger.fatal(msg)
5242 for mapping
in mappings_lines:
5244 if not mapping.startswith(
"#"):
5245 mapping = mapping.strip()
5246 if len(mapping) > 0:
5247 mapping_pieces = mapping.split()
5248 if len(mapping_pieces) != 2:
5249 msg =
"ERROR: The reference histogram mapping " \
5250 "file contains a line I don't " \
5251 "understand:\n %s" % mapping
5252 self.logger.fatal(msg)
5254 dataset_name = mapping_pieces[0].strip()
5255 ref_hist_name = mapping_pieces[1].strip()
5259 if self.ref_hist_mappings.has_key(dataset_name):
5260 msg =
"ERROR: The reference histogram mapping " \
5261 "file contains multiple mappings for " \
5263 self.logger.fatal(msg)
5267 self.ref_hist_mappings[dataset_name] = ref_hist_name
5271 self.logger.info(
" Successfully loaded %d mapping(s)" % \
5272 len(self.ref_hist_mappings))
5273 max_len =
max([len(i)
for i
in self.ref_hist_mappings.keys()])
5274 for (map_from, map_to)
in self.ref_hist_mappings.iteritems():
5275 self.logger.info(
" %-*s -> %s" % \
5276 (max_len, map_from, map_to))
5283 """Make sure all necessary reference histograms exist.
5285 Check that for each of the datasets to be processed a
5286 reference histogram is specified and that that histogram
5287 exists in the database.
5289 NOTE: There's a little complication here. Since this whole
5290 thing was designed to allow (in principle) harvesting of both
5291 data and MC datasets in one go, we need to be careful to check
5292 the availability fof reference mappings only for those
5293 datasets that need it.
5297 self.logger.info(
"Checking reference histogram mappings")
5299 for dataset_name
in self.datasets_to_use:
5301 ref_hist_name = self.ref_hist_mappings[dataset_name]
5303 msg =
"ERROR: No reference histogram mapping found " \
5304 "for dataset `%s'" % \
5306 self.logger.fatal(msg)
5309 if not self.check_ref_hist_tag(ref_hist_name):
5310 msg =
"Reference histogram tag `%s' " \
5311 "(used for dataset `%s') does not exist!" % \
5312 (ref_hist_name, dataset_name)
5313 self.logger.fatal(msg)
5316 self.logger.info(
" Done checking reference histogram mappings.")
5323 """Obtain all information on the datasets that we need to run.
5325 Use DBS to figure out all required information on our
5326 datasets, like the run numbers and the GlobalTag. All
5327 information is stored in the datasets_information member
5342 self.datasets_information = {}
5343 self.logger.info(
"Collecting information for all datasets to process")
5344 dataset_names = self.datasets_to_use.keys()
5345 dataset_names.sort()
5346 for dataset_name
in dataset_names:
5350 self.logger.info(sep_line)
5351 self.logger.info(
" `%s'" % dataset_name)
5352 self.logger.info(sep_line)
5354 runs = self.dbs_resolve_runs(dataset_name)
5355 self.logger.info(
" found %d run(s)" % len(runs))
5357 self.logger.debug(
" run number(s): %s" % \
5358 ", ".
join([str(i)
for i
in runs]))
5362 self.logger.warning(
" --> skipping dataset "
5364 assert False,
"Panic: found a dataset without runs " \
5368 cmssw_version = self.dbs_resolve_cmssw_version(dataset_name)
5369 self.logger.info(
" found CMSSW version `%s'" % cmssw_version)
5372 datatype = self.dbs_resolve_datatype(dataset_name)
5373 self.logger.info(
" sample is data or MC? --> %s" % \
5379 if self.globaltag
is None:
5380 globaltag = self.dbs_resolve_globaltag(dataset_name)
5382 globaltag = self.globaltag
5384 self.logger.info(
" found GlobalTag `%s'" % globaltag)
5390 assert datatype ==
"data", \
5391 "ERROR Empty GlobalTag for MC dataset!!!"
5399 sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5403 for run_number
in sites_catalog.keys():
5404 num_events[run_number] = sites_catalog \
5405 [run_number][
"all_sites"]
5406 del sites_catalog[run_number][
"all_sites"]
5411 for run_number
in sites_catalog.keys():
5412 mirror_catalog[run_number] = sites_catalog \
5413 [run_number][
"mirrored"]
5414 del sites_catalog[run_number][
"mirrored"]
5443 self.datasets_information[dataset_name] = {}
5444 self.datasets_information[dataset_name][
"runs"] = runs
5445 self.datasets_information[dataset_name][
"cmssw_version"] = \
5447 self.datasets_information[dataset_name][
"globaltag"] = globaltag
5448 self.datasets_information[dataset_name][
"datatype"] = datatype
5449 self.datasets_information[dataset_name][
"num_events"] = num_events
5450 self.datasets_information[dataset_name][
"mirrored"] = mirror_catalog
5451 self.datasets_information[dataset_name][
"sites"] = sites_catalog
5455 castor_path_common = self.create_castor_path_name_common(dataset_name)
5456 self.logger.info(
" output will go into `%s'" % \
5460 [self.create_castor_path_name_special(dataset_name, i, castor_path_common) \
5462 for path_name
in castor_paths.values():
5463 self.logger.debug(
" %s" % path_name)
5464 self.datasets_information[dataset_name][
"castor_path"] = \
5472 """Tell the user what to do now, after this part is done.
5474 This should provide the user with some (preferably
5475 copy-pasteable) instructions on what to do now with the setups
5476 and files that have been created.
5486 self.logger.info(
"")
5487 self.logger.info(sep_line)
5488 self.logger.info(
" Configuration files have been created.")
5489 self.logger.info(
" From here on please follow the usual CRAB instructions.")
5490 self.logger.info(
" Quick copy-paste instructions are shown below.")
5491 self.logger.info(sep_line)
5493 self.logger.info(
"")
5494 self.logger.info(
" Create all CRAB jobs:")
5495 self.logger.info(
" multicrab -create")
5496 self.logger.info(
"")
5497 self.logger.info(
" Submit all CRAB jobs:")
5498 self.logger.info(
" multicrab -submit")
5499 self.logger.info(
"")
5500 self.logger.info(
" Check CRAB status:")
5501 self.logger.info(
" multicrab -status")
5502 self.logger.info(
"")
5504 self.logger.info(
"")
5505 self.logger.info(
" For more information please see the CMS Twiki:")
5506 self.logger.info(
" %s" % twiki_url)
5507 self.logger.info(sep_line)
5511 if not self.all_sites_found:
5512 self.logger.warning(
" For some of the jobs no matching " \
5513 "site could be found")
5514 self.logger.warning(
" --> please scan your multicrab.cfg" \
5515 "for occurrences of `%s'." % \
5516 self.no_matching_site_found_str)
5517 self.logger.warning(
" You will have to fix those " \
5525 "Main entry point of the CMS harvester."
5535 self.parse_cmd_line_options()
5537 self.check_input_status()
5550 self.setup_harvesting_info()
5553 self.build_dataset_use_list()
5555 self.build_dataset_ignore_list()
5558 self.build_runs_use_list()
5559 self.build_runs_ignore_list()
5566 self.process_dataset_ignore_list()
5570 self.build_datasets_information()
5572 if self.use_ref_hists
and \
5573 self.ref_hist_mappings_needed():
5576 self.load_ref_hist_mappings()
5580 self.check_ref_hist_mappings()
5582 self.logger.info(
"No need to load reference " \
5583 "histogram mappings file")
5598 self.process_runs_use_and_ignore_lists()
5603 if self.harvesting_mode ==
"single-step-allow-partial":
5604 self.singlify_datasets()
5607 self.check_dataset_list()
5609 if len(self.datasets_to_use) < 1:
5610 self.logger.info(
"After all checks etc. " \
5611 "there are no datasets (left?) " \
5615 self.logger.info(
"After all checks etc. we are left " \
5616 "with %d dataset(s) to process " \
5617 "for a total of %d runs" % \
5618 (len(self.datasets_to_use),
5619 sum([len(i)
for i
in \
5620 self.datasets_to_use.values()])))
5647 self.create_and_check_castor_dirs()
5651 self.write_crab_config()
5652 self.write_multicrab_config()
5662 for dataset_name
in self.datasets_to_use.keys():
5664 self.write_harvesting_config(dataset_name)
5665 if self.harvesting_mode ==
"two-step":
5666 self.write_me_extraction_config(dataset_name)
5674 for run_number
in self.datasets_to_use[dataset_name]:
5675 tmp[run_number] = self.datasets_information \
5676 [dataset_name][
"num_events"][run_number]
5677 if self.book_keeping_information. \
5678 has_key(dataset_name):
5679 self.book_keeping_information[dataset_name].
update(tmp)
5681 self.book_keeping_information[dataset_name] = tmp
5684 self.show_exit_message()
5695 except Exception, err:
5704 if isinstance(err, SystemExit):
5705 self.logger.fatal(err.code)
5706 elif not isinstance(err, KeyboardInterrupt):
5707 self.logger.fatal(
"!" * 50)
5708 self.logger.fatal(
" This looks like a serious problem.")
5709 self.logger.fatal(
" If you are sure you followed all " \
5711 self.logger.fatal(
" please copy the below stack trace together")
5712 self.logger.fatal(
" with a description of what you were doing to")
5713 self.logger.fatal(
" jeroen.hegeman@cern.ch.")
5714 self.logger.fatal(
" %s" % self.ident_string())
5715 self.logger.fatal(
"!" * 50)
5716 self.logger.fatal(str(err))
5718 traceback_string = traceback.format_exc()
5719 for line
in traceback_string.split(
"\n"):
5720 self.logger.fatal(line)
5721 self.logger.fatal(
"!" * 50)
5736 if self.crab_submission ==
True:
5737 os.system(
"multicrab -create")
5738 os.system(
"multicrab -submit")
5749 if __name__ ==
"__main__":
5750 "Main entry point for harvesting."
def load_ref_hist_mappings
def option_handler_frontier_connection
def create_and_check_castor_dirs
def build_datasets_information
def format_conditions_string
def check_globaltag_exists
def build_runs_ignore_list
def option_handler_input_Jsonfile
def create_me_extraction_config
def db_account_name_cms_cond_dqm_summary
def option_handler_input_todofile
def dbs_resolve_globaltag
def create_multicrab_config
def db_account_name_cms_cond_globaltag
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
def option_handler_globaltag
def write_me_extraction_config
def create_me_summary_output_file_name
def write_multicrab_config
frontier_connection_overridden
def option_handler_harvesting_mode
Helper class: Error exception.
def create_multicrab_block_name
Helper class: Usage exception.
def write_harvesting_config
def process_runs_use_and_ignore_lists
const T & max(const T &a, const T &b)
def option_handler_no_ref_hists
def option_handler_saveByLumiSection
def option_handler_list_types
def create_harvesting_config_file_name
def create_and_check_castor_dir
def write_crab_config
def create_harvesting_config(self, dataset_name): """Create the Python harvesting configuration for a...
def create_castor_path_name_common
Helper class: DBSXMLHandler.
def parse_cmd_line_options
def build_dataset_ignore_list
def option_handler_castor_dir
def option_handler_dataset_name(self, option, opt_str, value, parser): """Specify the name(s) of the ...
def create_harvesting_config
def option_handler_ref_hist_mapping_file
def create_output_file_name
static std::string join(char **cmd)
def ref_hist_mappings_needed
def option_handler_harvesting_type
def option_handler_input_Jsonrunfile
def check_globaltag_contains_ref_hist_key
def dbs_resolve_runs
def dbs_resolve_dataset_number_of_events(self, dataset_name): """Ask DBS across how many events this ...
def create_harvesting_output_file_name
def dbs_resolve_number_of_events
def option_handler_book_keeping_file
def create_castor_path_name_special
def build_dataset_list
def dbs_check_dataset_num_events(self, dataset_name): """Figure out the number of events in each run ...
def option_handler_crab_submission
def dbs_resolve_dataset_name
def check_ref_hist_mappings
def option_handler_no_t1access
def check_results_validity
def create_me_summary_config_file_name
def create_config_file_name
def create_es_prefer_snippet
def dbs_check_dataset_spread
def dbs_resolve_dataset_number_of_sites(self, dataset_name): """Ask DBS across how many sites this da...
def option_handler_preferred_site
def build_dataset_use_list
def process_dataset_ignore_list
def dbs_resolve_cmssw_version
def setup_harvesting_info
def option_handler_caf_access
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
def option_handler_input_spec
def setup_dbs
Now we try to do a very simple DBS search.
ref_hist_mappings_file_name