CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
cmsHarvester.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 ###########################################################################
4 ## File : cmsHarvest.py
5 ## Authors : Jeroen Hegeman (jeroen.hegeman@cern.ch)
6 ## Niklas Pietsch (niklas.pietsch@desy.de)
7 ## Franseco Costanza (francesco.costanza@desy.de)
8 ## Last change: 20100308
9 ##
10 ## Purpose : Main program to run all kinds of harvesting.
11 ## For more information please refer to the CMS Twiki url
12 ## mentioned just below here.
13 ###########################################################################
14 
15 """Main program to run all kinds of harvesting.
16 
17 These are the basic kinds of harvesting implemented (contact me if
18 your favourite is missing):
19 
20 - RelVal : Run for release validation samples. Makes heavy use of MC
21  truth information.
22 
23 - RelValFS: FastSim RelVal.
24 
25 - MC : Run for MC samples.
26 
27 - DQMOffline : Run for real data (could also be run for MC).
28 
29 For the mappings of these harvesting types to sequence names please
30 see the setup_harvesting_info() and option_handler_list_types()
31 methods.
32 
33 """
34 
35 ###########################################################################
36 
37 __version__ = "3.8.2p1" # (version jump to match release)
38 __author__ = "Jeroen Hegeman (jeroen.hegeman@cern.ch)," \
39  "Niklas Pietsch (niklas.pietsch@desy.de)"
40 
41 twiki_url = "https://twiki.cern.ch/twiki/bin/view/CMS/CmsHarvester"
42 
43 ###########################################################################
44 
45 ###########################################################################
46 ## TODO list
47 ##
48 ## !!! Some code refactoring is in order. A lot of the code that loads
49 ## and builds dataset and run lists is duplicated. !!!
50 ##
51 ## - SPECIAL (future):
52 ## After discussing all these harvesting issues yet again with Luca,
53 ## it looks like we'll need something special to handle harvesting
54 ## for (collisions) data in reprocessing. Stuff with a special DBS
55 ## instance in which `rolling' reports of reprocessed datasets is
56 ## publised. In this case we will have to check (w.r.t. the parent
57 ## dataset) how much of a given run is ready, and submit once we're
58 ## satisfied (let's say 90%).
59 ##
60 ## - We could get rid of most of the `and dataset.status = VALID'
61 ## pieces in the DBS queries.
62 ## - Change to a more efficient grid scheduler.
63 ## - Implement incremental harvesting. Requires some changes to the
64 ## book keeping to store the harvested number of events for each
65 ## run. Also requires some changes to the dataset checking to see if
66 ## additional statistics have become available.
67 ## - Emphasize the warnings in case we're running in force
68 ## mode. Otherwise they may get lost a bit in the output.
69 ## - Fix the creation of the CASTOR dirs. The current approach works
70 ## but is a) too complicated and b) too inefficient.
71 ## - Fully implement all harvesting types.
72 ## --> Discuss with Andreas what exactly should be in there. And be
73 ## careful with the run numbers!
74 ## - Add the (second-step) harvesting config file to the (first-step)
75 ## ME extraction job to make sure it does not get lost.
76 ## - Improve sanity checks on harvesting type vs. data type.
77 ## - Implement reference histograms.
78 ## 1) User-specified reference dataset.
79 ## 2) Educated guess based on dataset name.
80 ## 3) References from GlobalTag.
81 ## 4) No reference at all.
82 ## - Is this options.evt_type used anywhere?
83 ## - Combine all these dbs_resolve_xxx into a single call to DBS(?).
84 ## - Implement CRAB server use?
85 ## - Add implementation of email address of user. (Only necessary for
86 ## CRAB server.)
87 ###########################################################################
88 
89 import os
90 import sys
91 import commands
92 import re
93 import logging
94 import optparse
95 import datetime
96 import copy
97 from inspect import getargspec
98 from random import choice
99 
100 
101 # These we need to communicate with DBS global DBSAPI
102 from DBSAPI.dbsApi import DbsApi
103 import DBSAPI.dbsException
105 # and these we need to parse the DBS output.
106 global xml
107 global SAXParseException
108 import xml.sax
109 from xml.sax import SAXParseException
110 
111 import Configuration.PyReleaseValidation
112 from Configuration.PyReleaseValidation.ConfigBuilder import \
113  ConfigBuilder, defaultOptions
114 # from Configuration.PyReleaseValidation.cmsDriverOptions import options, python_config_filename
115 
116 #import FWCore.ParameterSet.Config as cms
117 
118 # Debugging stuff.
119 import pdb
120 try:
121  import debug_hook
122 except ImportError:
123  pass
124 
125 ###########################################################################
126 ## Helper class: Usage exception.
127 ###########################################################################
129  def __init__(self, msg):
130  self.msg = msg
131  def __str__(self):
132  return repr(self.msg)
133 
134  # End of Usage.
135 
136 ###########################################################################
137 ## Helper class: Error exception.
138 ###########################################################################
140  def __init__(self, msg):
141  self.msg = msg
142  def __str__(self):
143  return repr(self.msg)
144 
145 ###########################################################################
146 ## Helper class: CMSHarvesterHelpFormatter.
147 ###########################################################################
148 
149 class CMSHarvesterHelpFormatter(optparse.IndentedHelpFormatter):
150  """Helper class to add some customised help output to cmsHarvester.
151 
152  We want to add some instructions, as well as a pointer to the CMS
153  Twiki.
154 
155  """
156 
157  def format_usage(self, usage):
158 
159  usage_lines = []
160 
161  sep_line = "-" * 60
162  usage_lines.append(sep_line)
163  usage_lines.append("Welcome to the CMS harvester, a (hopefully useful)")
164  usage_lines.append("tool to create harvesting configurations.")
165  usage_lines.append("For more information please have a look at the CMS Twiki:")
166  usage_lines.append(" %s" % twiki_url)
167  usage_lines.append(sep_line)
168  usage_lines.append("")
169 
170  # Since we only add to the output, we now just append the
171  # original output from IndentedHelpFormatter.
172  usage_lines.append(optparse.IndentedHelpFormatter. \
173  format_usage(self, usage))
174 
175  formatted_usage = "\n".join(usage_lines)
176  return formatted_usage
177 
178  # End of CMSHarvesterHelpFormatter.
179 
180 ###########################################################################
181 ## Helper class: DBSXMLHandler.
182 ###########################################################################
183 
184 class DBSXMLHandler(xml.sax.handler.ContentHandler):
185  """XML handler class to parse DBS results.
186 
187  The tricky thing here is that older DBS versions (2.0.5 and
188  earlier) return results in a different XML format than newer
189  versions. Previously the result values were returned as attributes
190  to the `result' element. The new approach returns result values as
191  contents of named elements.
192 
193  The old approach is handled directly in startElement(), the new
194  approach in characters().
195 
196  NOTE: All results are returned in the form of string values of
197  course!
198 
199  """
200 
201  # This is the required mapping from the name of the variable we
202  # ask for to what we call it ourselves. (Effectively this is the
203  # mapping between the old attribute key name and the new element
204  # name.)
205  mapping = {
206  "dataset" : "PATH",
207  "dataset.tag" : "PROCESSEDDATASET_GLOBALTAG",
208  "datatype.type" : "PRIMARYDSTYPE_TYPE",
209  "run" : "RUNS_RUNNUMBER",
210  "run.number" : "RUNS_RUNNUMBER",
211  "file.name" : "FILES_LOGICALFILENAME",
212  "file.numevents" : "FILES_NUMBEROFEVENTS",
213  "algo.version" : "APPVERSION_VERSION",
214  "site" : "STORAGEELEMENT_SENAME",
215  }
216 
217  def __init__(self, tag_names):
218  # This is a list used as stack to keep track of where we are
219  # in the element tree.
221  self.tag_names = tag_names
222  self.results = {}
223 
224  def startElement(self, name, attrs):
225  self.element_position.append(name)
226 
227  self.current_value = []
228 
229  #----------
230 
231  # This is to catch results from DBS 2.0.5 and earlier.
232  if name == "result":
233  for name in self.tag_names:
234  key = DBSXMLHandler.mapping[name]
235  value = str(attrs[key])
236  try:
237  self.results[name].append(value)
238  except KeyError:
239  self.results[name] = [value]
240 
241  #----------
242 
243  def endElement(self, name):
244  assert self.current_element() == name, \
245  "closing unopenend element `%s'" % name
246 
247  if self.current_element() in self.tag_names:
248  contents = "".join(self.current_value)
249  if self.results.has_key(self.current_element()):
250  self.results[self.current_element()].append(contents)
251  else:
252  self.results[self.current_element()] = [contents]
253 
254  self.element_position.pop()
255 
256  def characters(self, content):
257  # NOTE: It is possible that the actual contents of the tag
258  # gets split into multiple pieces. This method will be called
259  # for each of the pieces. This means we have to concatenate
260  # everything ourselves.
261  if self.current_element() in self.tag_names:
262  self.current_value.append(content)
263 
264  def current_element(self):
265  return self.element_position[-1]
266 
268  """Make sure that all results arrays have equal length.
269 
270  We should have received complete rows from DBS. I.e. all
271  results arrays in the handler should be of equal length.
272 
273  """
274 
275  results_valid = True
276 
277  res_names = self.results.keys()
278  if len(res_names) > 1:
279  for res_name in res_names[1:]:
280  res_tmp = self.results[res_name]
281  if len(res_tmp) != len(self.results[res_names[0]]):
282  results_valid = False
283 
284  return results_valid
285 
286  # End of DBSXMLHandler.
287 
288 ###########################################################################
289 ## CMSHarvester class.
290 ###########################################################################
291 
293  """Class to perform CMS harvesting.
294 
295  More documentation `obviously' to follow.
296 
297  """
298 
299  ##########
300 
301  def __init__(self, cmd_line_opts=None):
302  "Initialize class and process command line options."
303 
304  self.version = __version__
305 
306  # These are the harvesting types allowed. See the head of this
307  # file for more information.
309  "RelVal",
310  "RelValFS",
311  "MC",
312  "DQMOffline",
313  ]
314 
315  # These are the possible harvesting modes:
316  # - Single-step: harvesting takes place on-site in a single
317  # step. For each samples only a single ROOT file containing
318  # the harvesting results is returned.
319  # - Single-step-allow-partial: special hack to allow
320  # harvesting of partial statistics using single-step
321  # harvesting on spread-out samples.
322  # - Two-step: harvesting takes place in two steps. The first
323  # step returns a series of monitoring elenent summaries for
324  # each sample. The second step then merges these summaries
325  # locally and does the real harvesting. This second step
326  # produces the ROOT file containing the harvesting results.
328  "single-step",
329  "single-step-allow-partial",
330  "two-step"
331  ]
332 
333  # It is possible to specify a GlobalTag that will override any
334  # choices (regarding GlobalTags) made by the cmsHarvester.
335  # BUG BUG BUG
336  # For the moment, until I figure out a way to obtain the
337  # GlobalTag with which a given data (!) dataset was created,
338  # it is necessary to specify a GlobalTag for harvesting of
339  # data.
340  # BUG BUG BUG end
341  self.globaltag = None
342 
343  # It's also possible to switch off the use of reference
344  # histograms altogether.
345  self.use_ref_hists = True
346 
347  # The database name and account are hard-coded. They are not
348  # likely to change before the end-of-life of this tool. But of
349  # course there is a way to override this from the command
350  # line. One can even override the Frontier connection used for
351  # the GlobalTag and for the reference histograms
352  # independently. Please only use this for testing purposes.
354  self.frontier_connection_name["globaltag"] = "frontier://" \
355  "FrontierProd/"
356  self.frontier_connection_name["refhists"] = "frontier://" \
357  "FrontierProd/"
359  for key in self.frontier_connection_name.keys():
360  self.frontier_connection_overridden[key] = False
361 
362  # This contains information specific to each of the harvesting
363  # types. Used to create the harvesting configuration. It is
364  # filled by setup_harvesting_info().
365  self.harvesting_info = None
366 
367  ###
368 
369  # These are default `unused' values that will be filled in
370  # depending on the command line options.
371 
372  # The type of harvesting we're doing. See
373  # self.harvesting_types for allowed types.
374  self.harvesting_type = None
375 
376  # The harvesting mode, popularly known as single-step
377  # vs. two-step. The thing to remember at this point is that
378  # single-step is only possible for samples located completely
379  # at a single site (i.e. SE).
380  self.harvesting_mode = None
381  # BUG BUG BUG
382  # Default temporarily set to two-step until we can get staged
383  # jobs working with CRAB.
384  self.harvesting_mode_default = "single-step"
385  # BUG BUG BUG end
386 
387  # The input method: are we reading a dataset name (or regexp)
388  # directly from the command line or are we reading a file
389  # containing a list of dataset specifications. Actually we
390  # keep one of each for both datasets and runs.
391  self.input_method = {}
392  self.input_method["datasets"] = {}
393  self.input_method["datasets"]["use"] = None
394  self.input_method["datasets"]["ignore"] = None
395  self.input_method["runs"] = {}
396  self.input_method["runs"]["use"] = None
397  self.input_method["runs"]["ignore"] = None
398  self.input_method["runs"]["ignore"] = None
399  # The name of whatever input we're using.
400  self.input_name = {}
401  self.input_name["datasets"] = {}
402  self.input_name["datasets"]["use"] = None
403  self.input_name["datasets"]["ignore"] = None
404  self.input_name["runs"] = {}
405  self.input_name["runs"]["use"] = None
406  self.input_name["runs"]["ignore"] = None
407 
408  self.Jsonlumi = False
409  self.Jsonfilename = "YourJSON.txt"
410  self.Jsonrunfilename = "YourJSON.txt"
411  self.todofile = "YourToDofile.txt"
412 
413  # If this is true, we're running in `force mode'. In this case
414  # the sanity checks are performed but failure will not halt
415  # everything.
416  self.force_running = None
417 
418  # The base path of the output dir in CASTOR.
419  self.castor_base_dir = None
420  self.castor_base_dir_default = "/castor/cern.ch/" \
421  "cms/store/temp/" \
422  "dqm/offline/harvesting_output/"
423 
424  # The name of the file to be used for book keeping: which
425  # datasets, runs, etc. we have already processed.
426  self.book_keeping_file_name = None
427  self.book_keeping_file_name_default = "harvesting_accounting.txt"
428 
429  # The dataset name to reference histogram name mapping is read
430  # from a text file. The name of this file is kept in the
431  # following variable.
432  self.ref_hist_mappings_file_name = None
433  # And this is the default value.
434  self.ref_hist_mappings_file_name_default = "harvesting_ref_hist_mappings.txt"
435 
436  # Hmmm, hard-coded prefix of the CERN CASTOR area. This is the
437  # only supported CASTOR area.
438  # NOTE: Make sure this one starts with a `/'.
439  self.castor_prefix = "/castor/cern.ch"
440 
441  # Normally the central harvesting should be done using the
442  # `t1access' grid role. To be able to run without T1 access
443  # the --no-t1access flag can be used. This variable keeps
444  # track of that special mode.
445  self.non_t1access = False
446  self.caf_access = False
447  self.saveByLumiSection = False
448  self.crab_submission = False
449  self.nr_max_sites = 1
450 
451  self.preferred_site = "no preference"
452 
453  # This will become the list of datasets and runs to consider
454  self.datasets_to_use = {}
455  # and this will become the list of datasets and runs to skip.
456  self.datasets_to_ignore = {}
457  # This, in turn, will hold all book keeping information.
458  self.book_keeping_information = {}
459  # And this is where the dataset name to reference histogram
460  # name mapping is stored.
461  self.ref_hist_mappings = {}
462 
463  # We're now also allowing run selection. This means we also
464  # have to keep list of runs requested and vetoed by the user.
465  self.runs_to_use = {}
466  self.runs_to_ignore = {}
467 
468  # Cache for CMSSW version availability at different sites.
469  self.sites_and_versions_cache = {}
470 
471  # Cache for checked GlobalTags.
472  self.globaltag_check_cache = []
473 
474  # Global flag to see if there were any jobs for which we could
475  # not find a matching site.
476  self.all_sites_found = True
477 
478  # Helper string centrally defined.
479  self.no_matching_site_found_str = "no_matching_site_found"
480 
481  # Store command line options for later use.
482  if cmd_line_opts is None:
483  cmd_line_opts = sys.argv[1:]
484  self.cmd_line_opts = cmd_line_opts
485 
486  # Set up the logger.
487  log_handler = logging.StreamHandler()
488  # This is the default log formatter, the debug option switches
489  # on some more information.
490  log_formatter = logging.Formatter("%(message)s")
491  log_handler.setFormatter(log_formatter)
492  logger = logging.getLogger()
493  logger.name = "main"
494  logger.addHandler(log_handler)
495  self.logger = logger
496  # The default output mode is quite verbose.
497  self.set_output_level("NORMAL")
498 
499  #logger.debug("Initialized successfully")
500 
501  # End of __init__.
502 
503  ##########
504 
505  def cleanup(self):
506  "Clean up after ourselves."
507 
508  # NOTE: This is the safe replacement of __del__.
509 
510  #self.logger.debug("All done -> shutting down")
511  logging.shutdown()
512 
513  # End of cleanup.
514 
515  ##########
516 
517  def time_stamp(self):
518  "Create a timestamp to use in the created config files."
519 
520  time_now = datetime.datetime.utcnow()
521  # We don't care about the microseconds.
522  time_now = time_now.replace(microsecond = 0)
523  time_stamp = "%sUTC" % datetime.datetime.isoformat(time_now)
524 
525  # End of time_stamp.
526  return time_stamp
527 
528  ##########
529 
530  def ident_string(self):
531  "Spit out an identification string for cmsHarvester.py."
532 
533  ident_str = "`cmsHarvester.py " \
534  "version %s': cmsHarvester.py %s" % \
535  (__version__,
536  reduce(lambda x, y: x+' '+y, sys.argv[1:]))
537 
538  return ident_str
539 
540  ##########
541 
542  def format_conditions_string(self, globaltag):
543  """Create the conditions string needed for `cmsDriver'.
544 
545  Just glueing the FrontierConditions bit in front of it really.
546 
547  """
548 
549  # Not very robust but okay. The idea is that if the user
550  # specified (since this cannot happen with GlobalTags coming
551  # from DBS) something containing `conditions', they probably
552  # know what they're doing and we should not muck things up. In
553  # all other cases we just assume we only received the
554  # GlobalTag part and we built the usual conditions string from
555  # that .
556  if globaltag.lower().find("conditions") > -1:
557  conditions_string = globaltag
558  else:
559  conditions_string = "FrontierConditions_GlobalTag,%s" % \
560  globaltag
561 
562  # End of format_conditions_string.
563  return conditions_string
564 
565  ##########
566 
568  """Return the database account name used to store the GlobalTag.
569 
570  The name of the database account depends (albeit weakly) on
571  the CMSSW release version.
572 
573  """
574 
575  # This never changed, unlike the cms_cond_31X_DQM_SUMMARY ->
576  # cms_cond_34X_DQM transition.
577  account_name = "CMS_COND_31X_GLOBALTAG"
578 
579  # End of db_account_name_cms_cond_globaltag.
580  return account_name
581 
582  ##########
583 
585  """See db_account_name_cms_cond_globaltag."""
586 
587  account_name = None
588  version = self.cmssw_version[6:11]
589  if version < "3_4_0":
590  account_name = "CMS_COND_31X_DQM_SUMMARY"
591  else:
592  account_name = "CMS_COND_34X"
593 
594  # End of db_account_name_cms_cond_dqm_summary.
595  return account_name
596 
597  ##########
598 
600  "Create a nice header to be used to mark the generated files."
601 
602  tmp = []
603 
604  time_stamp = self.time_stamp()
605  ident_str = self.ident_string()
606  tmp.append("# %s" % time_stamp)
607  tmp.append("# WARNING: This file was created automatically!")
608  tmp.append("")
609  tmp.append("# Created by %s" % ident_str)
610 
611  header = "\n".join(tmp)
612 
613  # End of config_file_header.
614  return header
615 
616  ##########
617 
618  def set_output_level(self, output_level):
619  """Adjust the level of output generated.
620 
621  Choices are:
622  - normal : default level of output
623  - quiet : less output than the default
624  - verbose : some additional information
625  - debug : lots more information, may be overwhelming
626 
627  NOTE: The debug option is a bit special in the sense that it
628  also modifies the output format.
629 
630  """
631 
632  # NOTE: These levels are hooked up to the ones used in the
633  # logging module.
634  output_levels = {
635  "NORMAL" : logging.INFO,
636  "QUIET" : logging.WARNING,
637  "VERBOSE" : logging.INFO,
638  "DEBUG" : logging.DEBUG
639  }
640 
641  output_level = output_level.upper()
642 
643  try:
644  # Update the logger.
645  self.log_level = output_levels[output_level]
646  self.logger.setLevel(self.log_level)
647  except KeyError:
648  # Show a complaint
649  self.logger.fatal("Unknown output level `%s'" % ouput_level)
650  # and re-raise an exception.
651  raise Exception
652 
653  # End of set_output_level.
654 
655  ##########
656 
657  def option_handler_debug(self, option, opt_str, value, parser):
658  """Switch to debug mode.
659 
660  This both increases the amount of output generated, as well as
661  changes the format used (more detailed information is given).
662 
663  """
664 
665  # Switch to a more informative log formatter for debugging.
666  log_formatter_debug = logging.Formatter("[%(levelname)s] " \
667  # NOTE: funcName was
668  # only implemented
669  # starting with python
670  # 2.5.
671  #"%(funcName)s() " \
672  #"@%(filename)s:%(lineno)d " \
673  "%(message)s")
674  # Hmmm, not very nice. This assumes there's only a single
675  # handler associated with the current logger.
676  log_handler = self.logger.handlers[0]
677  log_handler.setFormatter(log_formatter_debug)
678  self.set_output_level("DEBUG")
679 
680  # End of option_handler_debug.
681 
682  ##########
683 
684  def option_handler_quiet(self, option, opt_str, value, parser):
685  "Switch to quiet mode: less verbose."
686 
687  self.set_output_level("QUIET")
688 
689  # End of option_handler_quiet.
690 
691  ##########
692 
693  def option_handler_force(self, option, opt_str, value, parser):
694  """Switch on `force mode' in which case we don't brake for nobody.
695 
696  In so-called `force mode' all sanity checks are performed but
697  we don't halt on failure. Of course this requires some care
698  from the user.
699 
700  """
701 
702  self.logger.debug("Switching on `force mode'.")
703  self.force_running = True
704 
705  # End of option_handler_force.
706 
707  ##########
708 
709  def option_handler_harvesting_type(self, option, opt_str, value, parser):
710  """Set the harvesting type to be used.
711 
712  This checks that no harvesting type is already set, and sets
713  the harvesting type to be used to the one specified. If a
714  harvesting type is already set an exception is thrown. The
715  same happens when an unknown type is specified.
716 
717  """
718 
719  # Check for (in)valid harvesting types.
720  # NOTE: The matching is done in a bit of a complicated
721  # way. This allows the specification of the type to be
722  # case-insensitive while still ending up with the properly
723  # `cased' version afterwards.
724  value = value.lower()
725  harvesting_types_lowered = [i.lower() for i in self.harvesting_types]
726  try:
727  type_index = harvesting_types_lowered.index(value)
728  # If this works, we now have the index to the `properly
729  # cased' version of the harvesting type.
730  except ValueError:
731  self.logger.fatal("Unknown harvesting type `%s'" % \
732  value)
733  self.logger.fatal(" possible types are: %s" %
734  ", ".join(self.harvesting_types))
735  raise Usage("Unknown harvesting type `%s'" % \
736  value)
737 
738  # Check if multiple (by definition conflicting) harvesting
739  # types are being specified.
740  if not self.harvesting_type is None:
741  msg = "Only one harvesting type should be specified"
742  self.logger.fatal(msg)
743  raise Usage(msg)
744  self.harvesting_type = self.harvesting_types[type_index]
745 
746  self.logger.info("Harvesting type to be used: `%s'" % \
747  self.harvesting_type)
748 
749  # End of option_handler_harvesting_type.
750 
751  ##########
752 
753  def option_handler_harvesting_mode(self, option, opt_str, value, parser):
754  """Set the harvesting mode to be used.
755 
756  Single-step harvesting can be used for samples that are
757  located completely at a single site (= SE). Otherwise use
758  two-step mode.
759 
760  """
761 
762  # Check for valid mode.
763  harvesting_mode = value.lower()
764  if not harvesting_mode in self.harvesting_modes:
765  msg = "Unknown harvesting mode `%s'" % harvesting_mode
766  self.logger.fatal(msg)
767  self.logger.fatal(" possible modes are: %s" % \
768  ", ".join(self.harvesting_modes))
769  raise Usage(msg)
770 
771  # Check if we've been given only a single mode, otherwise
772  # complain.
773  if not self.harvesting_mode is None:
774  msg = "Only one harvesting mode should be specified"
775  self.logger.fatal(msg)
776  raise Usage(msg)
777  self.harvesting_mode = harvesting_mode
778 
779  self.logger.info("Harvesting mode to be used: `%s'" % \
780  self.harvesting_mode)
781 
782  # End of option_handler_harvesting_mode.
783 
784  ##########
785 
786  def option_handler_globaltag(self, option, opt_str, value, parser):
787  """Set the GlobalTag to be used, overriding our own choices.
788 
789  By default the cmsHarvester will use the GlobalTag with which
790  a given dataset was created also for the harvesting. The
791  --globaltag option is the way to override this behaviour.
792 
793  """
794 
795  # Make sure that this flag only occurred once.
796  if not self.globaltag is None:
797  msg = "Only one GlobalTag should be specified"
798  self.logger.fatal(msg)
799  raise Usage(msg)
800  self.globaltag = value
801 
802  self.logger.info("GlobalTag to be used: `%s'" % \
803  self.globaltag)
804 
805  # End of option_handler_globaltag.
806 
807  ##########
808 
809  def option_handler_no_ref_hists(self, option, opt_str, value, parser):
810  "Switch use of all reference histograms off."
811 
812  self.use_ref_hists = False
813 
814  self.logger.warning("Switching off all use of reference histograms")
815 
816  # End of option_handler_no_ref_hists.
817 
818  ##########
819 
820  def option_handler_frontier_connection(self, option, opt_str,
821  value, parser):
822  """Override the default Frontier connection string.
823 
824  Please only use this for testing (e.g. when a test payload has
825  been inserted into cms_orc_off instead of cms_orc_on).
826 
827  This method gets called for three different command line
828  options:
829  - --frontier-connection,
830  - --frontier-connection-for-globaltag,
831  - --frontier-connection-for-refhists.
832  Appropriate care has to be taken to make sure things are only
833  specified once.
834 
835  """
836 
837  # Figure out with which command line option we've been called.
838  frontier_type = opt_str.split("-")[-1]
839  if frontier_type == "connection":
840  # Main option: change all connection strings.
841  frontier_types = self.frontier_connection_name.keys()
842  else:
843  frontier_types = [frontier_type]
844 
845  # Make sure that each Frontier connection is specified only
846  # once. (Okay, in a bit of a dodgy way...)
847  for connection_name in frontier_types:
848  if self.frontier_connection_overridden[connection_name] == True:
849  msg = "Please specify either:\n" \
850  " `--frontier-connection' to change the " \
851  "Frontier connection used for everything, or\n" \
852  "either one or both of\n" \
853  " `--frontier-connection-for-globaltag' to " \
854  "change the Frontier connection used for the " \
855  "GlobalTag and\n" \
856  " `--frontier-connection-for-refhists' to change " \
857  "the Frontier connection used for the " \
858  "reference histograms."
859  self.logger.fatal(msg)
860  raise Usage(msg)
861 
862  frontier_prefix = "frontier://"
863  if not value.startswith(frontier_prefix):
864  msg = "Expecting Frontier connections to start with " \
865  "`%s'. You specified `%s'." % \
866  (frontier_prefix, value)
867  self.logger.fatal(msg)
868  raise Usage(msg)
869  # We also kind of expect this to be either FrontierPrep or
870  # FrontierProd (but this is just a warning).
871  if value.find("FrontierProd") < 0 and \
872  value.find("FrontierProd") < 0:
873  msg = "Expecting Frontier connections to contain either " \
874  "`FrontierProd' or `FrontierPrep'. You specified " \
875  "`%s'. Are you sure?" % \
876  value
877  self.logger.warning(msg)
878 
879  if not value.endswith("/"):
880  value += "/"
881 
882  for connection_name in frontier_types:
883  self.frontier_connection_name[connection_name] = value
884  self.frontier_connection_overridden[connection_name] = True
885 
886  frontier_type_str = "unknown"
887  if connection_name == "globaltag":
888  frontier_type_str = "the GlobalTag"
889  elif connection_name == "refhists":
890  frontier_type_str = "the reference histograms"
891 
892  self.logger.warning("Overriding default Frontier " \
893  "connection for %s " \
894  "with `%s'" % \
895  (frontier_type_str,
896  self.frontier_connection_name[connection_name]))
897 
898  # End of option_handler_frontier_connection
899 
900  ##########
901 
902  def option_handler_input_todofile(self, option, opt_str, value, parser):
903 
904  self.todofile = value
905  # End of option_handler_input_todofile.
906 
907  ##########
908 
909  def option_handler_input_Jsonfile(self, option, opt_str, value, parser):
910 
911  self.Jsonfilename = value
912  # End of option_handler_input_Jsonfile.
913 
914  ##########
915 
916  def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser):
917 
918  self.Jsonrunfilename = value
919  # End of option_handler_input_Jsonrunfile.
920 
921  ##########
922 
923  def option_handler_input_spec(self, option, opt_str, value, parser):
924  """TODO TODO TODO
925  Document this...
926 
927  """
928 
929  # Figure out if we were called for the `use these' or the
930  # `ignore these' case.
931  if opt_str.lower().find("ignore") > -1:
932  spec_type = "ignore"
933  else:
934  spec_type = "use"
935 
936  # Similar: are we being called for datasets or for runs?
937  if opt_str.lower().find("dataset") > -1:
938  select_type = "datasets"
939  else:
940  select_type = "runs"
941 
942  if not self.input_method[select_type][spec_type] is None:
943  msg = "Please only specify one input method " \
944  "(for the `%s' case)" % opt_str
945  self.logger.fatal(msg)
946  raise Usage(msg)
947 
948  input_method = opt_str.replace("-", "").replace("ignore", "")
949  self.input_method[select_type][spec_type] = input_method
950  self.input_name[select_type][spec_type] = value
951 
952  self.logger.debug("Input method for the `%s' case: %s" % \
953  (spec_type, input_method))
954 
955  # End of option_handler_input_spec
956 
957  ##########
958 
959  def option_handler_book_keeping_file(self, option, opt_str, value, parser):
960  """Store the name of the file to be used for book keeping.
961 
962  The only check done here is that only a single book keeping
963  file is specified.
964 
965  """
966 
967  file_name = value
968 
969  if not self.book_keeping_file_name is None:
970  msg = "Only one book keeping file should be specified"
971  self.logger.fatal(msg)
972  raise Usage(msg)
973  self.book_keeping_file_name = file_name
974 
975  self.logger.info("Book keeping file to be used: `%s'" % \
977 
978  # End of option_handler_book_keeping_file.
979 
980  ##########
981 
982  def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser):
983  """Store the name of the file for the ref. histogram mapping.
984 
985  """
986 
987  file_name = value
988 
989  if not self.ref_hist_mappings_file_name is None:
990  msg = "Only one reference histogram mapping file " \
991  "should be specified"
992  self.logger.fatal(msg)
993  raise Usage(msg)
995 
996  self.logger.info("Reference histogram mapping file " \
997  "to be used: `%s'" % \
999 
1000  # End of option_handler_ref_hist_mapping_file.
1001 
1002  ##########
1003 
1004  # OBSOLETE OBSOLETE OBSOLETE
1005 
1006 ## def option_handler_dataset_name(self, option, opt_str, value, parser):
1007 ## """Specify the name(s) of the dataset(s) to be processed.
1008 
1009 ## It is checked to make sure that no dataset name or listfile
1010 ## names are given yet. If all is well (i.e. we still have a
1011 ## clean slate) the dataset name is stored for later use,
1012 ## otherwise a Usage exception is raised.
1013 
1014 ## """
1015 
1016 ## if not self.input_method is None:
1017 ## if self.input_method == "dataset":
1018 ## raise Usage("Please only feed me one dataset specification")
1019 ## elif self.input_method == "listfile":
1020 ## raise Usage("Cannot specify both dataset and input list file")
1021 ## else:
1022 ## assert False, "Unknown input method `%s'" % self.input_method
1023 ## self.input_method = "dataset"
1024 ## self.input_name = value
1025 ## self.logger.info("Input method used: %s" % self.input_method)
1026 
1027 ## # End of option_handler_dataset_name.
1028 
1029 ## ##########
1030 
1031 ## def option_handler_listfile_name(self, option, opt_str, value, parser):
1032 ## """Specify the input list file containing datasets to be processed.
1033 
1034 ## It is checked to make sure that no dataset name or listfile
1035 ## names are given yet. If all is well (i.e. we still have a
1036 ## clean slate) the listfile name is stored for later use,
1037 ## otherwise a Usage exception is raised.
1038 
1039 ## """
1040 
1041 ## if not self.input_method is None:
1042 ## if self.input_method == "listfile":
1043 ## raise Usage("Please only feed me one list file")
1044 ## elif self.input_method == "dataset":
1045 ## raise Usage("Cannot specify both dataset and input list file")
1046 ## else:
1047 ## assert False, "Unknown input method `%s'" % self.input_method
1048 ## self.input_method = "listfile"
1049 ## self.input_name = value
1050 ## self.logger.info("Input method used: %s" % self.input_method)
1051 
1052 ## # End of option_handler_listfile_name.
1053 
1054  # OBSOLETE OBSOLETE OBSOLETE end
1055 
1056  ##########
1057 
1058  def option_handler_castor_dir(self, option, opt_str, value, parser):
1059  """Specify where on CASTOR the output should go.
1060 
1061  At the moment only output to CERN CASTOR is
1062  supported. Eventually the harvested results should go into the
1063  central place for DQM on CASTOR anyway.
1064 
1065  """
1066 
1067  # Check format of specified CASTOR area.
1068  castor_dir = value
1069  #castor_dir = castor_dir.lstrip(os.path.sep)
1070  castor_prefix = self.castor_prefix
1071 
1072  # Add a leading slash if necessary and clean up the path.
1073  castor_dir = os.path.join(os.path.sep, castor_dir)
1074  self.castor_base_dir = os.path.normpath(castor_dir)
1075 
1076  self.logger.info("CASTOR (base) area to be used: `%s'" % \
1077  self.castor_base_dir)
1078 
1079  # End of option_handler_castor_dir.
1080 
1081  ##########
1082 
1083  def option_handler_no_t1access(self, option, opt_str, value, parser):
1084  """Set the self.no_t1access flag to try and create jobs that
1085  run without special `t1access' role.
1086 
1087  """
1088 
1089  self.non_t1access = True
1090 
1091  self.logger.warning("Running in `non-t1access' mode. " \
1092  "Will try to create jobs that run " \
1093  "without special rights but no " \
1094  "further promises...")
1095 
1096  # End of option_handler_no_t1access.
1097 
1098  ##########
1099 
1100  def option_handler_caf_access(self, option, opt_str, value, parser):
1101  """Set the self.caf_access flag to try and create jobs that
1102  run on the CAF.
1103 
1104  """
1105  self.caf_access = True
1106 
1107  self.logger.warning("Running in `caf_access' mode. " \
1108  "Will try to create jobs that run " \
1109  "on CAF but no" \
1110  "further promises...")
1111 
1112  # End of option_handler_caf_access.
1113 
1114  ##########
1115 
1116  def option_handler_saveByLumiSection(self, option, opt_str, value, parser):
1117  """Set process.dqmSaver.saveByLumiSectiont=1 in cfg harvesting file
1118  """
1119  self.saveByLumiSection = True
1120 
1121  self.logger.warning("waning concerning saveByLumiSection option")
1122 
1123  # End of option_handler_saveByLumiSection.
1124 
1125 
1126  ##########
1127 
1128  def option_handler_crab_submission(self, option, opt_str, value, parser):
1129  """Crab jobs are not created and
1130  "submitted automatically",
1131  """
1132  self.crab_submission = True
1133 
1134  # End of option_handler_crab_submission.
1135 
1136  ##########
1137 
1138  def option_handler_sites(self, option, opt_str, value, parser):
1139 
1140  self.nr_max_sites = value
1141 
1142  ##########
1143 
1144  def option_handler_preferred_site(self, option, opt_str, value, parser):
1145 
1146  self.preferred_site = value
1147 
1148  ##########
1149 
1150  def option_handler_list_types(self, option, opt_str, value, parser):
1151  """List all harvesting types and their mappings.
1152 
1153  This lists all implemented harvesting types with their
1154  corresponding mappings to sequence names. This had to be
1155  separated out from the help since it depends on the CMSSW
1156  version and was making things a bit of a mess.
1157 
1158  NOTE: There is no way (at least not that I could come up with)
1159  to code this in a neat generic way that can be read both by
1160  this method and by setup_harvesting_info(). Please try hard to
1161  keep these two methods in sync!
1162 
1163  """
1164 
1165  sep_line = "-" * 50
1166  sep_line_short = "-" * 20
1167 
1168  print sep_line
1169  print "The following harvesting types are available:"
1170  print sep_line
1171 
1172  print "`RelVal' maps to:"
1173  print " pre-3_3_0 : HARVESTING:validationHarvesting"
1174  print " 3_4_0_pre2 and later: HARVESTING:validationHarvesting+dqmHarvesting"
1175  print " Exceptions:"
1176  print " 3_3_0_pre1-4 : HARVESTING:validationHarvesting"
1177  print " 3_3_0_pre6 : HARVESTING:validationHarvesting"
1178  print " 3_4_0_pre1 : HARVESTING:validationHarvesting"
1179 
1180  print sep_line_short
1181 
1182  print "`RelValFS' maps to:"
1183  print " always : HARVESTING:validationHarvestingFS"
1184 
1185  print sep_line_short
1186 
1187  print "`MC' maps to:"
1188  print " always : HARVESTING:validationprodHarvesting"
1189 
1190  print sep_line_short
1191 
1192  print "`DQMOffline' maps to:"
1193  print " always : HARVESTING:dqmHarvesting"
1194 
1195  print sep_line
1196 
1197  # We're done, let's quit. (This is the same thing optparse
1198  # does after printing the help.)
1199  raise SystemExit
1200 
1201  # End of option_handler_list_types.
1202 
1203  ##########
1204 
1206  """Fill our dictionary with all info needed to understand
1207  harvesting.
1208 
1209  This depends on the CMSSW version since at some point the
1210  names and sequences were modified.
1211 
1212  NOTE: There is no way (at least not that I could come up with)
1213  to code this in a neat generic way that can be read both by
1214  this method and by option_handler_list_types(). Please try
1215  hard to keep these two methods in sync!
1216 
1217  """
1218 
1219  assert not self.cmssw_version is None, \
1220  "ERROR setup_harvesting() requires " \
1221  "self.cmssw_version to be set!!!"
1222 
1223  harvesting_info = {}
1224 
1225  # This is the version-independent part.
1226  harvesting_info["DQMOffline"] = {}
1227  harvesting_info["DQMOffline"]["beamspot"] = None
1228  harvesting_info["DQMOffline"]["eventcontent"] = None
1229  harvesting_info["DQMOffline"]["harvesting"] = "AtRunEnd"
1230 
1231  harvesting_info["RelVal"] = {}
1232  harvesting_info["RelVal"]["beamspot"] = None
1233  harvesting_info["RelVal"]["eventcontent"] = None
1234  harvesting_info["RelVal"]["harvesting"] = "AtRunEnd"
1235 
1236  harvesting_info["RelValFS"] = {}
1237  harvesting_info["RelValFS"]["beamspot"] = None
1238  harvesting_info["RelValFS"]["eventcontent"] = None
1239  harvesting_info["RelValFS"]["harvesting"] = "AtRunEnd"
1240 
1241  harvesting_info["MC"] = {}
1242  harvesting_info["MC"]["beamspot"] = None
1243  harvesting_info["MC"]["eventcontent"] = None
1244  harvesting_info["MC"]["harvesting"] = "AtRunEnd"
1245 
1246  # This is the version-dependent part. And I know, strictly
1247  # speaking it's not necessary to fill in all three types since
1248  # in a single run we'll only use one type anyway. This does
1249  # look more readable, however, and required less thought from
1250  # my side when I put this together.
1251 
1252  # DEBUG DEBUG DEBUG
1253  # Check that we understand our own version naming.
1254  assert self.cmssw_version.startswith("CMSSW_")
1255  # DEBUG DEBUG DEBUG end
1256 
1257  version = self.cmssw_version[6:]
1258 
1259  #----------
1260 
1261  # RelVal
1262  step_string = None
1263  if version < "3_3_0":
1264  step_string = "validationHarvesting"
1265  elif version in ["3_3_0_pre1", "3_3_0_pre2",
1266  "3_3_0_pre3", "3_3_0_pre4",
1267  "3_3_0_pre6", "3_4_0_pre1"]:
1268  step_string = "validationHarvesting"
1269  else:
1270  step_string = "validationHarvesting+dqmHarvesting"
1271 
1272  harvesting_info["RelVal"]["step_string"] = step_string
1273 
1274  # DEBUG DEBUG DEBUG
1275  # Let's make sure we found something.
1276  assert not step_string is None, \
1277  "ERROR Could not decide a RelVal harvesting sequence " \
1278  "for CMSSW version %s" % self.cmssw_version
1279  # DEBUG DEBUG DEBUG end
1280 
1281  #----------
1282 
1283  # RelVal
1284  step_string = "validationHarvestingFS"
1285 
1286  harvesting_info["RelValFS"]["step_string"] = step_string
1287 
1288  #----------
1289 
1290  # MC
1291  step_string = "validationprodHarvesting"
1292 
1293  harvesting_info["MC"]["step_string"] = step_string
1294 
1295  # DEBUG DEBUG DEBUG
1296  # Let's make sure we found something.
1297  assert not step_string is None, \
1298  "ERROR Could not decide a MC harvesting " \
1299  "sequence for CMSSW version %s" % self.cmssw_version
1300  # DEBUG DEBUG DEBUG end
1301 
1302  #----------
1303 
1304  # DQMOffline
1305  step_string = "dqmHarvesting"
1306 
1307  harvesting_info["DQMOffline"]["step_string"] = step_string
1308 
1309  #----------
1310 
1311  self.harvesting_info = harvesting_info
1312 
1313  self.logger.info("Based on the CMSSW version (%s) " \
1314  "I decided to use the `HARVESTING:%s' " \
1315  "sequence for %s harvesting" % \
1316  (self.cmssw_version,
1317  self.harvesting_info[self.harvesting_type]["step_string"],
1318  self.harvesting_type))
1319 
1320  # End of setup_harvesting_info.
1321 
1322  ##########
1323 
1324  def create_castor_path_name_common(self, dataset_name):
1325  """Build the common part of the output path to be used on
1326  CASTOR.
1327 
1328  This consists of the CASTOR area base path specified by the
1329  user and a piece depending on the data type (data vs. MC), the
1330  harvesting type and the dataset name followed by a piece
1331  containing the run number and event count. (See comments in
1332  create_castor_path_name_special for details.) This method
1333  creates the common part, without run number and event count.
1334 
1335  """
1336 
1337  castor_path = self.castor_base_dir
1338 
1339  ###
1340 
1341  # The data type: data vs. mc.
1342  datatype = self.datasets_information[dataset_name]["datatype"]
1343  datatype = datatype.lower()
1344  castor_path = os.path.join(castor_path, datatype)
1345 
1346  # The harvesting type.
1347  harvesting_type = self.harvesting_type
1348  harvesting_type = harvesting_type.lower()
1349  castor_path = os.path.join(castor_path, harvesting_type)
1350 
1351  # The CMSSW release version (only the `digits'). Note that the
1352  # CMSSW version used here is the version used for harvesting,
1353  # not the one from the dataset. This does make the results
1354  # slightly harder to find. On the other hand it solves
1355  # problems in case one re-harvests a given dataset with a
1356  # different CMSSW version, which would lead to ambiguous path
1357  # names. (Of course for many cases the harvesting is done with
1358  # the same CMSSW version the dataset was created with.)
1359  release_version = self.cmssw_version
1360  release_version = release_version.lower(). \
1361  replace("cmssw", ""). \
1362  strip("_")
1363  castor_path = os.path.join(castor_path, release_version)
1364 
1365  # The dataset name.
1366  dataset_name_escaped = self.escape_dataset_name(dataset_name)
1367  castor_path = os.path.join(castor_path, dataset_name_escaped)
1368 
1369  ###
1370 
1371  castor_path = os.path.normpath(castor_path)
1372 
1373  # End of create_castor_path_name_common.
1374  return castor_path
1375 
1376  ##########
1377 
1379  dataset_name, run_number,
1380  castor_path_common):
1381  """Create the specialised part of the CASTOR output dir name.
1382 
1383  NOTE: To avoid clashes with `incremental harvesting'
1384  (re-harvesting when a dataset grows) we have to include the
1385  event count in the path name. The underlying `problem' is that
1386  CRAB does not overwrite existing output files so if the output
1387  file already exists CRAB will fail to copy back the output.
1388 
1389  NOTE: It's not possible to create different kinds of
1390  harvesting jobs in a single call to this tool. However, in
1391  principle it could be possible to create both data and MC jobs
1392  in a single go.
1393 
1394  NOTE: The number of events used in the path name is the
1395  _total_ number of events in the dataset/run at the time of
1396  harvesting. If we're doing partial harvesting the final
1397  results will reflect lower statistics. This is a) the easiest
1398  to code and b) the least likely to lead to confusion if
1399  someone ever decides to swap/copy around file blocks between
1400  sites.
1401 
1402  """
1403 
1404  castor_path = castor_path_common
1405 
1406  ###
1407 
1408  # The run number part.
1409  castor_path = os.path.join(castor_path, "run_%d" % run_number)
1410 
1411  ###
1412 
1413  # The event count (i.e. the number of events we currently see
1414  # for this dataset).
1415  #nevents = self.datasets_information[dataset_name] \
1416  # ["num_events"][run_number]
1417  castor_path = os.path.join(castor_path, "nevents")
1418 
1419  ###
1420 
1421  castor_path = os.path.normpath(castor_path)
1422 
1423  # End of create_castor_path_name_special.
1424  return castor_path
1425 
1426  ##########
1427 
1429  """Make sure all required CASTOR output dirs exist.
1430 
1431  This checks the CASTOR base dir specified by the user as well
1432  as all the subdirs required by the current set of jobs.
1433 
1434  """
1435 
1436  self.logger.info("Checking (and if necessary creating) CASTOR " \
1437  "output area(s)...")
1438 
1439  # Call the real checker method for the base dir.
1440  self.create_and_check_castor_dir(self.castor_base_dir)
1441 
1442  # Now call the checker for all (unique) subdirs.
1443  castor_dirs = []
1444  for (dataset_name, runs) in self.datasets_to_use.iteritems():
1445 
1446  for run in runs:
1447  castor_dirs.append(self.datasets_information[dataset_name] \
1448  ["castor_path"][run])
1449  castor_dirs_unique = list(set(castor_dirs))
1450  castor_dirs_unique.sort()
1451  # This can take some time. E.g. CRAFT08 has > 300 runs, each
1452  # of which will get a new directory. So we show some (rough)
1453  # info in between.
1454  ndirs = len(castor_dirs_unique)
1455  step = max(ndirs / 10, 1)
1456  for (i, castor_dir) in enumerate(castor_dirs_unique):
1457  if (i + 1) % step == 0 or \
1458  (i + 1) == ndirs:
1459  self.logger.info(" %d/%d" % \
1460  (i + 1, ndirs))
1461  self.create_and_check_castor_dir(castor_dir)
1462 
1463  # Now check if the directory is empty. If (an old version
1464  # of) the output file already exists CRAB will run new
1465  # jobs but never copy the results back. We assume the user
1466  # knows what they are doing and only issue a warning in
1467  # case the directory is not empty.
1468  self.logger.debug("Checking if path `%s' is empty" % \
1469  castor_dir)
1470  cmd = "rfdir %s" % castor_dir
1471  (status, output) = commands.getstatusoutput(cmd)
1472  if status != 0:
1473  msg = "Could not access directory `%s'" \
1474  " !!! This is bad since I should have just" \
1475  " created it !!!" % castor_dir
1476  self.logger.fatal(msg)
1477  raise Error(msg)
1478  if len(output) > 0:
1479  self.logger.warning("Output directory `%s' is not empty:" \
1480  " new jobs will fail to" \
1481  " copy back output" % \
1482  castor_dir)
1483 
1484  # End of create_and_check_castor_dirs.
1485 
1486  ##########
1487 
1488  def create_and_check_castor_dir(self, castor_dir):
1489  """Check existence of the give CASTOR dir, if necessary create
1490  it.
1491 
1492  Some special care has to be taken with several things like
1493  setting the correct permissions such that CRAB can store the
1494  output results. Of course this means that things like
1495  /castor/cern.ch/ and user/j/ have to be recognised and treated
1496  properly.
1497 
1498  NOTE: Only CERN CASTOR area (/castor/cern.ch/) supported for
1499  the moment.
1500 
1501  NOTE: This method uses some slightly tricky caching to make
1502  sure we don't keep over and over checking the same base paths.
1503 
1504  """
1505 
1506  ###
1507 
1508  # Local helper function to fully split a path into pieces.
1509  def split_completely(path):
1510  (parent_path, name) = os.path.split(path)
1511  if name == "":
1512  return (parent_path, )
1513  else:
1514  return split_completely(parent_path) + (name, )
1515 
1516  ###
1517 
1518  # Local helper function to check rfio (i.e. CASTOR)
1519  # directories.
1520  def extract_permissions(rfstat_output):
1521  """Parse the output from rfstat and return the
1522  5-digit permissions string."""
1523 
1524  permissions_line = [i for i in output.split("\n") \
1525  if i.lower().find("protection") > -1]
1526  regexp = re.compile(".*\(([0123456789]{5})\).*")
1527  match = regexp.search(rfstat_output)
1528  if not match or len(match.groups()) != 1:
1529  msg = "Could not extract permissions " \
1530  "from output: %s" % rfstat_output
1531  self.logger.fatal(msg)
1532  raise Error(msg)
1533  permissions = match.group(1)
1534 
1535  # End of extract_permissions.
1536  return permissions
1537 
1538  ###
1539 
1540  # These are the pieces of CASTOR directories that we do not
1541  # want to touch when modifying permissions.
1542 
1543  # NOTE: This is all a bit involved, basically driven by the
1544  # fact that one wants to treat the `j' directory of
1545  # `/castor/cern.ch/user/j/jhegeman/' specially.
1546  # BUG BUG BUG
1547  # This should be simplified, for example by comparing to the
1548  # CASTOR prefix or something like that.
1549  # BUG BUG BUG end
1550  castor_paths_dont_touch = {
1551  0: ["/", "castor", "cern.ch", "cms", "store", "temp",
1552  "dqm", "offline", "user"],
1553  -1: ["user", "store"]
1554  }
1555 
1556  self.logger.debug("Checking CASTOR path `%s'" % castor_dir)
1557 
1558  ###
1559 
1560  # First we take the full CASTOR path apart.
1561  castor_path_pieces = split_completely(castor_dir)
1562 
1563  # Now slowly rebuild the CASTOR path and see if a) all
1564  # permissions are set correctly and b) the final destination
1565  # exists.
1566  path = ""
1567  check_sizes = castor_paths_dont_touch.keys()
1568  check_sizes.sort()
1569  len_castor_path_pieces = len(castor_path_pieces)
1570  for piece_index in xrange (len_castor_path_pieces):
1571  skip_this_path_piece = False
1572  piece = castor_path_pieces[piece_index]
1573 ## self.logger.debug("Checking CASTOR path piece `%s'" % \
1574 ## piece)
1575  for check_size in check_sizes:
1576  # Do we need to do anything with this?
1577  if (piece_index + check_size) > -1:
1578 ## self.logger.debug("Checking `%s' against `%s'" % \
1579 ## (castor_path_pieces[piece_index + check_size],
1580 ## castor_paths_dont_touch[check_size]))
1581  if castor_path_pieces[piece_index + check_size] in castor_paths_dont_touch[check_size]:
1582 ## self.logger.debug(" skipping")
1583  skip_this_path_piece = True
1584 ## else:
1585 ## # Piece not in the list, fine.
1586 ## self.logger.debug(" accepting")
1587  # Add piece to the path we're building.
1588 ## self.logger.debug("!!! Skip path piece `%s'? %s" % \
1589 ## (piece, str(skip_this_path_piece)))
1590 ## self.logger.debug("Adding piece to path...")
1591  path = os.path.join(path, piece)
1592 ## self.logger.debug("Path is now `%s'" % \
1593 ## path)
1594 
1595  # Hmmmm, only at this point can we do some caching. Not
1596  # ideal, but okay.
1597  try:
1598  if path in self.castor_path_checks_cache:
1599  continue
1600  except AttributeError:
1601  # This only happens the first time around.
1602  self.castor_path_checks_cache = []
1603  self.castor_path_checks_cache.append(path)
1604 
1605  # Now, unless we're supposed to skip this piece of the
1606  # path, let's make sure it exists and set the permissions
1607  # correctly for use by CRAB. This means that:
1608  # - the final output directory should (at least) have
1609  # permissions 775
1610  # - all directories above that should (at least) have
1611  # permissions 755.
1612 
1613  # BUT: Even though the above permissions are the usual
1614  # ones to used when setting up CASTOR areas for grid job
1615  # output, there is one caveat in case multiple people are
1616  # working in the same CASTOR area. If user X creates
1617  # /a/b/c/ and user Y wants to create /a/b/d/ he/she does
1618  # not have sufficient rights. So: we set all dir
1619  # permissions to 775 to avoid this.
1620 
1621  if not skip_this_path_piece:
1622 
1623  # Ok, first thing: let's make sure this directory
1624  # exists.
1625  # NOTE: The nice complication is of course that the
1626  # usual os.path.isdir() etc. methods don't work for an
1627  # rfio filesystem. So we call rfstat and interpret an
1628  # error as meaning that the path does not exist.
1629  self.logger.debug("Checking if path `%s' exists" % \
1630  path)
1631  cmd = "rfstat %s" % path
1632  (status, output) = commands.getstatusoutput(cmd)
1633  if status != 0:
1634  # Path does not exist, let's try and create it.
1635  self.logger.debug("Creating path `%s'" % path)
1636  cmd = "nsmkdir -m 775 %s" % path
1637  (status, output) = commands.getstatusoutput(cmd)
1638  if status != 0:
1639  msg = "Could not create directory `%s'" % path
1640  self.logger.fatal(msg)
1641  raise Error(msg)
1642  cmd = "rfstat %s" % path
1643  (status, output) = commands.getstatusoutput(cmd)
1644  # Now check that it looks like a directory. If I'm not
1645  # mistaken one can deduce this from the fact that the
1646  # (octal) permissions string starts with `40' (instead
1647  # of `100').
1648  permissions = extract_permissions(output)
1649  if not permissions.startswith("40"):
1650  msg = "Path `%s' is not a directory(?)" % path
1651  self.logger.fatal(msg)
1652  raise Error(msg)
1653 
1654  # Figure out the current permissions for this
1655  # (partial) path.
1656  self.logger.debug("Checking permissions for path `%s'" % path)
1657  cmd = "rfstat %s" % path
1658  (status, output) = commands.getstatusoutput(cmd)
1659  if status != 0:
1660  msg = "Could not obtain permissions for directory `%s'" % \
1661  path
1662  self.logger.fatal(msg)
1663  raise Error(msg)
1664  # Take the last three digits of the permissions.
1665  permissions = extract_permissions(output)[-3:]
1666 
1667  # Now if necessary fix permissions.
1668  # NOTE: Be careful never to `downgrade' permissions.
1669  if piece_index == (len_castor_path_pieces - 1):
1670  # This means we're looking at the final
1671  # destination directory.
1672  permissions_target = "775"
1673  else:
1674  # `Only' an intermediate directory.
1675  permissions_target = "775"
1676 
1677  # Compare permissions.
1678  permissions_new = []
1679  for (i, j) in zip(permissions, permissions_target):
1680  permissions_new.append(str(max(int(i), int(j))))
1681  permissions_new = "".join(permissions_new)
1682  self.logger.debug(" current permissions: %s" % \
1683  permissions)
1684  self.logger.debug(" target permissions : %s" % \
1685  permissions_target)
1686  if permissions_new != permissions:
1687  # We have to modify the permissions.
1688  self.logger.debug("Changing permissions of `%s' " \
1689  "to %s (were %s)" % \
1690  (path, permissions_new, permissions))
1691  cmd = "rfchmod %s %s" % (permissions_new, path)
1692  (status, output) = commands.getstatusoutput(cmd)
1693  if status != 0:
1694  msg = "Could not change permissions for path `%s' " \
1695  "to %s" % (path, permissions_new)
1696  self.logger.fatal(msg)
1697  raise Error(msg)
1698 
1699  self.logger.debug(" Permissions ok (%s)" % permissions_new)
1700 
1701  # End of create_and_check_castor_dir.
1702 
1703  ##########
1704 
1705  def pick_a_site(self, sites, cmssw_version):
1706 
1707  # Create list of forbidden sites
1708  sites_forbidden = []
1709 
1710  if (self.preferred_site == "CAF") or (self.preferred_site == "caf.cern.ch"):
1711  self.caf_access = True
1712 
1713  if self.caf_access == False:
1714  sites_forbidden.append("caf.cern.ch")
1715 
1716  # These are the T1 sites. These are only forbidden if we're
1717  # running in non-T1 mode.
1718  # Source:
1719  # https://cmsweb.cern.ch/sitedb/sitelist/?naming_scheme=ce
1720  # Hard-coded, yes. Not nice, no.
1721 
1722  all_t1 = [
1723  "srm-cms.cern.ch",
1724  "ccsrm.in2p3.fr",
1725  "cmssrm-fzk.gridka.de",
1726  "cmssrm.fnal.gov",
1727  "gridka-dCache.fzk.de",
1728  "srm-cms.gridpp.rl.ac.uk",
1729  "srm.grid.sinica.edu.tw",
1730  "srm2.grid.sinica.edu.tw",
1731  "srmcms.pic.es",
1732  "storm-fe-cms.cr.cnaf.infn.it"
1733  ]
1734 
1735  country_codes = {
1736  "CAF" : "caf.cern.ch",
1737  "CH" : "srm-cms.cern.ch",
1738  "FR" : "ccsrm.in2p3.fr",
1739  "DE" : "cmssrm-fzk.gridka.de",
1740  "GOV" : "cmssrm.fnal.gov",
1741  "DE2" : "gridka-dCache.fzk.de",
1742  "UK" : "srm-cms.gridpp.rl.ac.uk",
1743  "TW" : "srm.grid.sinica.edu.tw",
1744  "TW2" : "srm2.grid.sinica.edu.tw",
1745  "ES" : "srmcms.pic.es",
1746  "IT" : "storm-fe-cms.cr.cnaf.infn.it"
1747  }
1748 
1749  if self.non_t1access:
1750  sites_forbidden.extend(all_t1)
1751 
1752  for site in sites_forbidden:
1753  if site in sites:
1754  sites.remove(site)
1755 
1756  if self.preferred_site in country_codes:
1757  self.preferred_site = country_codes[self.preferred_site]
1758 
1759  if self.preferred_site != "no preference":
1760  if self.preferred_site in sites:
1761  sites = [self.preferred_site]
1762  else:
1763  sites= []
1764 
1765  #print sites
1766 
1767  # Looks like we have to do some caching here, otherwise things
1768  # become waaaay toooo sloooooow. So that's what the
1769  # sites_and_versions_cache does.
1770 
1771  # NOTE: Keep this set to None!
1772  site_name = None
1773  cmd = None
1774  while len(sites) > 0 and \
1775  site_name is None:
1776 
1777  # Create list of t1_sites
1778  t1_sites = []
1779  for site in sites:
1780  if site in all_t1:
1781  t1_sites.append(site)
1782  if site == "caf.cern.ch":
1783  t1_sites.append(site)
1784 
1785  # If avilable pick preferred site
1786  #if self.preferred_site in sites:
1787  # se_name = self.preferred_site
1788  # Else, if available pick t1 site
1789 
1790  if len(t1_sites) > 0:
1791  se_name = choice(t1_sites)
1792  # Else pick any site
1793  else:
1794  se_name = choice(sites)
1795 
1796  # But check that it hosts the CMSSW version we want.
1797 
1798  if self.sites_and_versions_cache.has_key(se_name) and \
1799  self.sites_and_versions_cache[se_name].has_key(cmssw_version):
1800  if self.sites_and_versions_cache[se_name][cmssw_version]:
1801  site_name = se_name
1802  break
1803  else:
1804  self.logger.debug(" --> rejecting site `%s'" % se_name)
1805  sites.remove(se_name)
1806 
1807  else:
1808  self.logger.info("Checking if site `%s' " \
1809  "has CMSSW version `%s'" % \
1810  (se_name, cmssw_version))
1811  self.sites_and_versions_cache[se_name] = {}
1812 
1813  # TODO TODO TODO
1814  # Test for SCRAM architecture removed as per request
1815  # from Andreas.
1816  # scram_arch = os.getenv("SCRAM_ARCH")
1817  # cmd = "lcg-info --list-ce " \
1818  # "--query '" \
1819  # "Tag=VO-cms-%s," \
1820  # "Tag=VO-cms-%s," \
1821  # "CEStatus=Production," \
1822  # "CloseSE=%s'" % \
1823  # (cmssw_version, scram_arch, se_name)
1824  # TODO TODO TODO end
1825 
1826  cmd = "lcg-info --list-ce " \
1827  "--query '" \
1828  "Tag=VO-cms-%s," \
1829  "CEStatus=Production," \
1830  "CloseSE=%s'" % \
1831  (cmssw_version, se_name)
1832  (status, output) = commands.getstatusoutput(cmd)
1833  if status != 0:
1834  self.logger.error("Could not check site information " \
1835  "for site `%s'" % se_name)
1836  else:
1837  if (len(output) > 0) or (se_name == "caf.cern.ch"):
1838  self.sites_and_versions_cache[se_name][cmssw_version] = True
1839  site_name = se_name
1840  break
1841  else:
1842  self.sites_and_versions_cache[se_name][cmssw_version] = False
1843  self.logger.debug(" --> rejecting site `%s'" % se_name)
1844  sites.remove(se_name)
1845 
1846  if site_name is self.no_matching_site_found_str:
1847  self.logger.error(" --> no matching site found")
1848  self.logger.error(" --> Your release or SCRAM " \
1849  "architecture may not be available" \
1850  "anywhere on the (LCG) grid.")
1851  if not cmd is None:
1852  self.logger.debug(" (command used: `%s')" % cmd)
1853  else:
1854  self.logger.debug(" --> selected site `%s'" % site_name)
1855 
1856  # Return something more descriptive (than `None') in case we
1857  # found nothing.
1858  if site_name is None:
1859  site_name = self.no_matching_site_found_str
1860  # Keep track of our global flag signifying that this
1861  # happened.
1862  self.all_sites_found = False
1863 
1864  # End of pick_a_site.
1865  return site_name
1866 
1867  ##########
1868 
1870 
1871  # Set up the command line parser. Note that we fix up the help
1872  # formatter so that we can add some text pointing people to
1873  # the Twiki etc.
1874  parser = optparse.OptionParser(version="%s %s" % \
1875  ("%prog", self.version),
1876  formatter=CMSHarvesterHelpFormatter())
1877 
1878  self.option_parser = parser
1879 
1880  # The debug switch.
1881  parser.add_option("-d", "--debug",
1882  help="Switch on debug mode",
1883  action="callback",
1884  callback=self.option_handler_debug)
1885 
1886  # The quiet switch.
1887  parser.add_option("-q", "--quiet",
1888  help="Be less verbose",
1889  action="callback",
1890  callback=self.option_handler_quiet)
1891 
1892  # The force switch. If this switch is used sanity checks are
1893  # performed but failures do not lead to aborts. Use with care.
1894  parser.add_option("", "--force",
1895  help="Force mode. Do not abort on sanity check "
1896  "failures",
1897  action="callback",
1898  callback=self.option_handler_force)
1899 
1900  # Choose between the different kinds of harvesting.
1901  parser.add_option("", "--harvesting_type",
1902  help="Harvesting type: %s" % \
1903  ", ".join(self.harvesting_types),
1904  action="callback",
1905  callback=self.option_handler_harvesting_type,
1906  type="string",
1907  metavar="HARVESTING_TYPE")
1908 
1909  # Choose between single-step and two-step mode.
1910  parser.add_option("", "--harvesting_mode",
1911  help="Harvesting mode: %s (default = %s)" % \
1912  (", ".join(self.harvesting_modes),
1913  self.harvesting_mode_default),
1914  action="callback",
1915  callback=self.option_handler_harvesting_mode,
1916  type="string",
1917  metavar="HARVESTING_MODE")
1918 
1919  # Override the GlobalTag chosen by the cmsHarvester.
1920  parser.add_option("", "--globaltag",
1921  help="GlobalTag to use. Default is the ones " \
1922  "the dataset was created with for MC, for data" \
1923  "a GlobalTag has to be specified.",
1924  action="callback",
1925  callback=self.option_handler_globaltag,
1926  type="string",
1927  metavar="GLOBALTAG")
1928 
1929  # Allow switching off of reference histograms.
1930  parser.add_option("", "--no-ref-hists",
1931  help="Don't use any reference histograms",
1932  action="callback",
1933  callback=self.option_handler_no_ref_hists)
1934 
1935  # Allow the default (i.e. the one that should be used)
1936  # Frontier connection to be overridden.
1937  parser.add_option("", "--frontier-connection",
1938  help="Use this Frontier connection to find " \
1939  "GlobalTags and LocalTags (for reference " \
1940  "histograms).\nPlease only use this for " \
1941  "testing.",
1942  action="callback",
1943  callback=self.option_handler_frontier_connection,
1944  type="string",
1945  metavar="FRONTIER")
1946 
1947  # Similar to the above but specific to the Frontier connection
1948  # to be used for the GlobalTag.
1949  parser.add_option("", "--frontier-connection-for-globaltag",
1950  help="Use this Frontier connection to find " \
1951  "GlobalTags.\nPlease only use this for " \
1952  "testing.",
1953  action="callback",
1954  callback=self.option_handler_frontier_connection,
1955  type="string",
1956  metavar="FRONTIER")
1957 
1958  # Similar to the above but specific to the Frontier connection
1959  # to be used for the reference histograms.
1960  parser.add_option("", "--frontier-connection-for-refhists",
1961  help="Use this Frontier connection to find " \
1962  "LocalTags (for reference " \
1963  "histograms).\nPlease only use this for " \
1964  "testing.",
1965  action="callback",
1966  callback=self.option_handler_frontier_connection,
1967  type="string",
1968  metavar="FRONTIER")
1969 
1970  # Option to specify the name (or a regexp) of the dataset(s)
1971  # to be used.
1972  parser.add_option("", "--dataset",
1973  help="Name (or regexp) of dataset(s) to process",
1974  action="callback",
1975  #callback=self.option_handler_dataset_name,
1976  callback=self.option_handler_input_spec,
1977  type="string",
1978  #dest="self.input_name",
1979  metavar="DATASET")
1980 
1981  # Option to specify the name (or a regexp) of the dataset(s)
1982  # to be ignored.
1983  parser.add_option("", "--dataset-ignore",
1984  help="Name (or regexp) of dataset(s) to ignore",
1985  action="callback",
1986  callback=self.option_handler_input_spec,
1987  type="string",
1988  metavar="DATASET-IGNORE")
1989 
1990  # Option to specify the name (or a regexp) of the run(s)
1991  # to be used.
1992  parser.add_option("", "--runs",
1993  help="Run number(s) to process",
1994  action="callback",
1995  callback=self.option_handler_input_spec,
1996  type="string",
1997  metavar="RUNS")
1998 
1999  # Option to specify the name (or a regexp) of the run(s)
2000  # to be ignored.
2001  parser.add_option("", "--runs-ignore",
2002  help="Run number(s) to ignore",
2003  action="callback",
2004  callback=self.option_handler_input_spec,
2005  type="string",
2006  metavar="RUNS-IGNORE")
2007 
2008  # Option to specify a file containing a list of dataset names
2009  # (or regexps) to be used.
2010  parser.add_option("", "--datasetfile",
2011  help="File containing list of dataset names " \
2012  "(or regexps) to process",
2013  action="callback",
2014  #callback=self.option_handler_listfile_name,
2015  callback=self.option_handler_input_spec,
2016  type="string",
2017  #dest="self.input_name",
2018  metavar="DATASETFILE")
2019 
2020  # Option to specify a file containing a list of dataset names
2021  # (or regexps) to be ignored.
2022  parser.add_option("", "--datasetfile-ignore",
2023  help="File containing list of dataset names " \
2024  "(or regexps) to ignore",
2025  action="callback",
2026  callback=self.option_handler_input_spec,
2027  type="string",
2028  metavar="DATASETFILE-IGNORE")
2029 
2030  # Option to specify a file containing a list of runs to be
2031  # used.
2032  parser.add_option("", "--runslistfile",
2033  help="File containing list of run numbers " \
2034  "to process",
2035  action="callback",
2036  callback=self.option_handler_input_spec,
2037  type="string",
2038  metavar="RUNSLISTFILE")
2039 
2040  # Option to specify a file containing a list of runs
2041  # to be ignored.
2042  parser.add_option("", "--runslistfile-ignore",
2043  help="File containing list of run numbers " \
2044  "to ignore",
2045  action="callback",
2046  callback=self.option_handler_input_spec,
2047  type="string",
2048  metavar="RUNSLISTFILE-IGNORE")
2049 
2050  # Option to specify a Jsonfile contaning a list of runs
2051  # to be used.
2052  parser.add_option("", "--Jsonrunfile",
2053  help="Jsonfile containing dictionary of run/lumisections pairs. " \
2054  "All lumisections of runs contained in dictionary are processed.",
2055  action="callback",
2056  callback=self.option_handler_input_Jsonrunfile,
2057  type="string",
2058  metavar="JSONRUNFILE")
2059 
2060  # Option to specify a Jsonfile contaning a dictionary of run/lumisections pairs
2061  # to be used.
2062  parser.add_option("", "--Jsonfile",
2063  help="Jsonfile containing dictionary of run/lumisections pairs. " \
2064  "Only specified lumisections of runs contained in dictionary are processed.",
2065  action="callback",
2066  callback=self.option_handler_input_Jsonfile,
2067  type="string",
2068  metavar="JSONFILE")
2069 
2070  # Option to specify a ToDo file contaning a list of runs
2071  # to be used.
2072  parser.add_option("", "--todo-file",
2073  help="Todo file containing a list of runs to process.",
2074  action="callback",
2075  callback=self.option_handler_input_todofile,
2076  type="string",
2077  metavar="TODO-FILE")
2078 
2079  # Option to specify which file to use for the dataset name to
2080  # reference histogram name mappings.
2081  parser.add_option("", "--refhistmappingfile",
2082  help="File to be use for the reference " \
2083  "histogram mappings. Default: `%s'." % \
2084  self.ref_hist_mappings_file_name_default,
2085  action="callback",
2086  callback=self.option_handler_ref_hist_mapping_file,
2087  type="string",
2088  metavar="REFHISTMAPPING-FILE")
2089 
2090  # Specify the place in CASTOR where the output should go.
2091  # NOTE: Only output to CASTOR is supported for the moment,
2092  # since the central DQM results place is on CASTOR anyway.
2093  parser.add_option("", "--castordir",
2094  help="Place on CASTOR to store results. " \
2095  "Default: `%s'." % \
2096  self.castor_base_dir_default,
2097  action="callback",
2098  callback=self.option_handler_castor_dir,
2099  type="string",
2100  metavar="CASTORDIR")
2101 
2102  # Use this to try and create jobs that will run without
2103  # special `t1access' role.
2104  parser.add_option("", "--no-t1access",
2105  help="Try to create jobs that will run " \
2106  "without special `t1access' role",
2107  action="callback",
2108  callback=self.option_handler_no_t1access)
2109 
2110  # Use this to create jobs that may run on CAF
2111  parser.add_option("", "--caf-access",
2112  help="Crab jobs may run " \
2113  "on CAF",
2114  action="callback",
2115  callback=self.option_handler_caf_access)
2116 
2117  # set process.dqmSaver.saveByLumiSection=1 in harvesting cfg file
2118  parser.add_option("", "--saveByLumiSection",
2119  help="set saveByLumiSection=1 in harvesting cfg file",
2120  action="callback",
2121  callback=self.option_handler_saveByLumiSection)
2122 
2123  # Use this to enable automatic creation and submission of crab jobs
2124  parser.add_option("", "--automatic-crab-submission",
2125  help="Crab jobs are created and " \
2126  "submitted automatically",
2127  action="callback",
2128  callback=self.option_handler_crab_submission)
2129 
2130  # Option to set the max number of sites, each
2131  #job is submitted to
2132  parser.add_option("", "--max-sites",
2133  help="Max. number of sites each job is submitted to",
2134  action="callback",
2135  callback=self.option_handler_sites,
2136  type="int")
2137 
2138  # Option to set the preferred site
2139  parser.add_option("", "--site",
2140  help="Crab jobs are submitted to specified site. T1 sites may be shortened by the following (country) codes: \
2141  srm-cms.cern.ch : CH \
2142  ccsrm.in2p3.fr : FR \
2143  cmssrm-fzk.gridka.de : DE \
2144  cmssrm.fnal.gov : GOV \
2145  gridka-dCache.fzk.de : DE2 \
2146  rm-cms.gridpp.rl.ac.uk : UK \
2147  srm.grid.sinica.edu.tw : TW \
2148  srm2.grid.sinica.edu.tw : TW2 \
2149  srmcms.pic.es : ES \
2150  storm-fe-cms.cr.cnaf.infn.it : IT",
2151  action="callback",
2152  callback=self.option_handler_preferred_site,
2153  type="string")
2154 
2155  # This is the command line flag to list all harvesting
2156  # type-to-sequence mappings.
2157  parser.add_option("-l", "--list",
2158  help="List all harvesting types and their" \
2159  "corresponding sequence names",
2160  action="callback",
2161  callback=self.option_handler_list_types)
2162 
2163  # If nothing was specified: tell the user how to do things the
2164  # next time and exit.
2165  # NOTE: We just use the OptParse standard way of doing this by
2166  # acting as if a '--help' was specified.
2167  if len(self.cmd_line_opts) < 1:
2168  self.cmd_line_opts = ["--help"]
2169 
2170  # Some trickery with the options. Why? Well, since these
2171  # options change the output level immediately from the option
2172  # handlers, the results differ depending on where they are on
2173  # the command line. Let's just make sure they are at the
2174  # front.
2175  # NOTE: Not very efficient or sophisticated, but it works and
2176  # it only has to be done once anyway.
2177  for i in ["-d", "--debug",
2178  "-q", "--quiet"]:
2179  if i in self.cmd_line_opts:
2180  self.cmd_line_opts.remove(i)
2181  self.cmd_line_opts.insert(0, i)
2182 
2183  # Everything is set up, now parse what we were given.
2184  parser.set_defaults()
2185  (self.options, self.args) = parser.parse_args(self.cmd_line_opts)
2186 
2187  # End of parse_cmd_line_options.
2188 
2189  ##########
2190 
2192  """Check completeness and correctness of input information.
2193 
2194  Check that all required information has been specified and
2195  that, at least as far as can be easily checked, it makes
2196  sense.
2197 
2198  NOTE: This is also where any default values are applied.
2199 
2200  """
2201 
2202  self.logger.info("Checking completeness/correctness of input...")
2203 
2204  # The cmsHarvester does not take (i.e. understand) any
2205  # arguments so there should not be any.
2206  if len(self.args) > 0:
2207  msg = "Sorry but I don't understand `%s'" % \
2208  (" ".join(self.args))
2209  self.logger.fatal(msg)
2210  raise Usage(msg)
2211 
2212  # BUG BUG BUG
2213  # While we wait for some bugs left and right to get fixed, we
2214  # disable two-step.
2215  if self.harvesting_mode == "two-step":
2216  msg = "--------------------\n" \
2217  " Sorry, but for the moment (well, till it works)" \
2218  " the two-step mode has been disabled.\n" \
2219  "--------------------\n"
2220  self.logger.fatal(msg)
2221  raise Error(msg)
2222  # BUG BUG BUG end
2223 
2224  # We need a harvesting method to be specified
2225  if self.harvesting_type is None:
2226  msg = "Please specify a harvesting type"
2227  self.logger.fatal(msg)
2228  raise Usage(msg)
2229  # as well as a harvesting mode.
2230  if self.harvesting_mode is None:
2231  self.harvesting_mode = self.harvesting_mode_default
2232  msg = "No harvesting mode specified --> using default `%s'" % \
2233  self.harvesting_mode
2234  self.logger.warning(msg)
2235  #raise Usage(msg)
2236 
2237  ###
2238 
2239  # We need an input method so we can find the dataset name(s).
2240  if self.input_method["datasets"]["use"] is None:
2241  msg = "Please specify an input dataset name " \
2242  "or a list file name"
2243  self.logger.fatal(msg)
2244  raise Usage(msg)
2245 
2246  # DEBUG DEBUG DEBUG
2247  # If we get here, we should also have an input name.
2248  assert not self.input_name["datasets"]["use"] is None
2249  # DEBUG DEBUG DEBUG end
2250 
2251  ###
2252 
2253  # The same holds for the reference histogram mapping file (if
2254  # we're using references).
2255  if self.use_ref_hists:
2256  if self.ref_hist_mappings_file_name is None:
2257  self.ref_hist_mappings_file_name = self.ref_hist_mappings_file_name_default
2258  msg = "No reference histogram mapping file specified --> " \
2259  "using default `%s'" % \
2260  self.ref_hist_mappings_file_name
2261  self.logger.warning(msg)
2262 
2263  ###
2264 
2265  # We need to know where to put the stuff (okay, the results)
2266  # on CASTOR.
2267  if self.castor_base_dir is None:
2268  self.castor_base_dir = self.castor_base_dir_default
2269  msg = "No CASTOR area specified -> using default `%s'" % \
2270  self.castor_base_dir
2271  self.logger.warning(msg)
2272  #raise Usage(msg)
2273 
2274  # Only the CERN CASTOR area is supported.
2275  if not self.castor_base_dir.startswith(self.castor_prefix):
2276  msg = "CASTOR area does not start with `%s'" % \
2277  self.castor_prefix
2278  self.logger.fatal(msg)
2279  if self.castor_base_dir.find("castor") > -1 and \
2280  not self.castor_base_dir.find("cern.ch") > -1:
2281  self.logger.fatal("Only CERN CASTOR is supported")
2282  raise Usage(msg)
2283 
2284  ###
2285 
2286  # TODO TODO TODO
2287  # This should be removed in the future, once I find out how to
2288  # get the config file used to create a given dataset from DBS.
2289 
2290  # For data we need to have a GlobalTag. (For MC we can figure
2291  # it out by ourselves.)
2292  if self.globaltag is None:
2293  self.logger.warning("No GlobalTag specified. This means I cannot")
2294  self.logger.warning("run on data, only on MC.")
2295  self.logger.warning("I will skip all data datasets.")
2296 
2297  # TODO TODO TODO end
2298 
2299  # Make sure the GlobalTag ends with `::All'.
2300  if not self.globaltag is None:
2301  if not self.globaltag.endswith("::All"):
2302  self.logger.warning("Specified GlobalTag `%s' does " \
2303  "not end in `::All' --> " \
2304  "appending this missing piece" % \
2305  self.globaltag)
2306  self.globaltag = "%s::All" % self.globaltag
2307 
2308  ###
2309 
2310  # Dump some info about the Frontier connections used.
2311  for (key, value) in self.frontier_connection_name.iteritems():
2312  frontier_type_str = "unknown"
2313  if key == "globaltag":
2314  frontier_type_str = "the GlobalTag"
2315  elif key == "refhists":
2316  frontier_type_str = "the reference histograms"
2317  non_str = None
2318  if self.frontier_connection_overridden[key] == True:
2319  non_str = "non-"
2320  else:
2321  non_str = ""
2322  self.logger.info("Using %sdefault Frontier " \
2323  "connection for %s: `%s'" % \
2324  (non_str, frontier_type_str, value))
2325 
2326  ###
2327 
2328  # End of check_input_status.
2329 
2330  ##########
2331 
2332  def check_cmssw(self):
2333  """Check if CMSSW is setup.
2334 
2335  """
2336 
2337  # Try to access the CMSSW_VERSION environment variable. If
2338  # it's something useful we consider CMSSW to be set up
2339  # properly. Otherwise we raise an error.
2340  cmssw_version = os.getenv("CMSSW_VERSION")
2341  if cmssw_version is None:
2342  self.logger.fatal("It seems CMSSW is not setup...")
2343  self.logger.fatal("($CMSSW_VERSION is empty)")
2344  raise Error("ERROR: CMSSW needs to be setup first!")
2345 
2346  self.cmssw_version = cmssw_version
2347  self.logger.info("Found CMSSW version %s properly set up" % \
2348  self.cmssw_version)
2349 
2350  # End of check_cmsssw.
2351  return True
2352 
2353  ##########
2354 
2355  def check_dbs(self):
2356  """Check if DBS is setup.
2357 
2358  """
2359 
2360  # Try to access the DBSCMD_HOME environment variable. If this
2361  # looks useful we consider DBS to be set up
2362  # properly. Otherwise we raise an error.
2363  dbs_home = os.getenv("DBSCMD_HOME")
2364  if dbs_home is None:
2365  self.logger.fatal("It seems DBS is not setup...")
2366  self.logger.fatal(" $DBSCMD_HOME is empty")
2367  raise Error("ERROR: DBS needs to be setup first!")
2368 
2369 ## # Now we try to do a very simple DBS search. If that works
2370 ## # instead of giving us the `Unsupported API call' crap, we
2371 ## # should be good to go.
2372 ## # NOTE: Not ideal, I know, but it reduces the amount of
2373 ## # complaints I get...
2374 ## cmd = "dbs search --query=\"find dataset where dataset = impossible\""
2375 ## (status, output) = commands.getstatusoutput(cmd)
2376 ## pdb.set_trace()
2377 ## if status != 0 or \
2378 ## output.lower().find("unsupported api call") > -1:
2379 ## self.logger.fatal("It seems DBS is not setup...")
2380 ## self.logger.fatal(" %s returns crap:" % cmd)
2381 ## for line in output.split("\n"):
2382 ## self.logger.fatal(" %s" % line)
2383 ## raise Error("ERROR: DBS needs to be setup first!")
2384 
2385  self.logger.debug("Found DBS properly set up")
2386 
2387  # End of check_dbs.
2388  return True
2389 
2390  ##########
2391 
2392  def setup_dbs(self):
2393  """Setup the Python side of DBS.
2394 
2395  For more information see the DBS Python API documentation:
2396  https://twiki.cern.ch/twiki/bin/view/CMS/DBSApiDocumentation
2397 
2398  """
2399 
2400  try:
2401  args={}
2402  args["url"]= "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/" \
2403  "servlet/DBSServlet"
2404  api = DbsApi(args)
2405  self.dbs_api = api
2406 
2407  except DBSAPI.dbsApiException.DbsApiException, ex:
2408  self.logger.fatal("Caught DBS API exception %s: %s " % \
2409  (ex.getClassName(), ex.getErrorMessage()))
2410  if ex.getErrorCode() not in (None, ""):
2411  logger.debug("DBS exception error code: ", ex.getErrorCode())
2412  raise
2413 
2414  # End of setup_dbs.
2415 
2416  ##########
2417 
2418  def dbs_resolve_dataset_name(self, dataset_name):
2419  """Use DBS to resolve a wildcarded dataset name.
2420 
2421  """
2422 
2423  # DEBUG DEBUG DEBUG
2424  # If we get here DBS should have been set up already.
2425  assert not self.dbs_api is None
2426  # DEBUG DEBUG DEBUG end
2427 
2428  # Some minor checking to make sure that whatever we've been
2429  # given as dataset name actually sounds like a dataset name.
2430  if not (dataset_name.startswith("/") and \
2431  dataset_name.endswith("RECO")):
2432  self.logger.warning("Dataset name `%s' does not sound " \
2433  "like a valid dataset name!" % \
2434  dataset_name)
2435 
2436  #----------
2437 
2438  api = self.dbs_api
2439  dbs_query = "find dataset where dataset like %s " \
2440  "and dataset.status = VALID" % \
2441  dataset_name
2442  try:
2443  api_result = api.executeQuery(dbs_query)
2444  except DBSAPI.dbsApiException.DbsApiException:
2445  msg = "ERROR: Could not execute DBS query"
2446  self.logger.fatal(msg)
2447  raise Error(msg)
2448 
2449  # Setup parsing.
2450  handler = DBSXMLHandler(["dataset"])
2451  parser = xml.sax.make_parser()
2452  parser.setContentHandler(handler)
2453 
2454  # Parse.
2455  try:
2456  xml.sax.parseString(api_result, handler)
2457  except SAXParseException:
2458  msg = "ERROR: Could not parse DBS server output"
2459  self.logger.fatal(msg)
2460  raise Error(msg)
2461 
2462  # DEBUG DEBUG DEBUG
2463  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2464  # DEBUG DEBUG DEBUG end
2465 
2466  # Extract the results.
2467  datasets = handler.results.values()[0]
2468 
2469  # End of dbs_resolve_dataset_name.
2470  return datasets
2471 
2472  ##########
2473 
2474  def dbs_resolve_cmssw_version(self, dataset_name):
2475  """Ask DBS for the CMSSW version used to create this dataset.
2476 
2477  """
2478 
2479  # DEBUG DEBUG DEBUG
2480  # If we get here DBS should have been set up already.
2481  assert not self.dbs_api is None
2482  # DEBUG DEBUG DEBUG end
2483 
2484  api = self.dbs_api
2485  dbs_query = "find algo.version where dataset = %s " \
2486  "and dataset.status = VALID" % \
2487  dataset_name
2488  try:
2489  api_result = api.executeQuery(dbs_query)
2490  except DBSAPI.dbsApiException.DbsApiException:
2491  msg = "ERROR: Could not execute DBS query"
2492  self.logger.fatal(msg)
2493  raise Error(msg)
2494 
2495  handler = DBSXMLHandler(["algo.version"])
2496  parser = xml.sax.make_parser()
2497  parser.setContentHandler(handler)
2498 
2499  try:
2500  xml.sax.parseString(api_result, handler)
2501  except SAXParseException:
2502  msg = "ERROR: Could not parse DBS server output"
2503  self.logger.fatal(msg)
2504  raise Error(msg)
2505 
2506  # DEBUG DEBUG DEBUG
2507  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2508  # DEBUG DEBUG DEBUG end
2509 
2510  cmssw_version = handler.results.values()[0]
2511 
2512  # DEBUG DEBUG DEBUG
2513  assert len(cmssw_version) == 1
2514  # DEBUG DEBUG DEBUG end
2515 
2516  cmssw_version = cmssw_version[0]
2517 
2518  # End of dbs_resolve_cmssw_version.
2519  return cmssw_version
2520 
2521  ##########
2522 
2523 ## def dbs_resolve_dataset_number_of_events(self, dataset_name):
2524 ## """Ask DBS across how many events this dataset has been spread
2525 ## out.
2526 
2527 ## This is especially useful to check that we do not submit a job
2528 ## supposed to run on a complete sample that is not contained at
2529 ## a single site.
2530 
2531 ## """
2532 
2533 ## # DEBUG DEBUG DEBUG
2534 ## # If we get here DBS should have been set up already.
2535 ## assert not self.dbs_api is None
2536 ## # DEBUG DEBUG DEBUG end
2537 
2538 ## api = self.dbs_api
2539 ## dbs_query = "find count(site) where dataset = %s " \
2540 ## "and dataset.status = VALID" % \
2541 ## dataset_name
2542 ## try:
2543 ## api_result = api.executeQuery(dbs_query)
2544 ## except DbsApiException:
2545 ## raise Error("ERROR: Could not execute DBS query")
2546 
2547 ## try:
2548 ## num_events = []
2549 ## class Handler(xml.sax.handler.ContentHandler):
2550 ## def startElement(self, name, attrs):
2551 ## if name == "result":
2552 ## num_events.append(str(attrs["COUNT_STORAGEELEMENT"]))
2553 ## xml.sax.parseString(api_result, Handler())
2554 ## except SAXParseException:
2555 ## raise Error("ERROR: Could not parse DBS server output")
2556 
2557 ## # DEBUG DEBUG DEBUG
2558 ## assert len(num_events) == 1
2559 ## # DEBUG DEBUG DEBUG end
2560 
2561 ## num_events = int(num_events[0])
2562 
2563 ## # End of dbs_resolve_dataset_number_of_events.
2564 ## return num_events
2565 
2566  ##########
2567 
2568  def dbs_resolve_runs(self, dataset_name):
2569  """Ask DBS for the list of runs in a given dataset.
2570 
2571  # NOTE: This does not (yet?) skip/remove empty runs. There is
2572  # a bug in the DBS entry run.numevents (i.e. it always returns
2573  # zero) which should be fixed in the `next DBS release'.
2574  # See also:
2575  # https://savannah.cern.ch/bugs/?53452
2576  # https://savannah.cern.ch/bugs/?53711
2577 
2578  """
2579 
2580  # TODO TODO TODO
2581  # We should remove empty runs as soon as the above mentioned
2582  # bug is fixed.
2583  # TODO TODO TODO end
2584 
2585  # DEBUG DEBUG DEBUG
2586  # If we get here DBS should have been set up already.
2587  assert not self.dbs_api is None
2588  # DEBUG DEBUG DEBUG end
2589 
2590  api = self.dbs_api
2591  dbs_query = "find run where dataset = %s " \
2592  "and dataset.status = VALID" % \
2593  dataset_name
2594  try:
2595  api_result = api.executeQuery(dbs_query)
2596  except DBSAPI.dbsApiException.DbsApiException:
2597  msg = "ERROR: Could not execute DBS query"
2598  self.logger.fatal(msg)
2599  raise Error(msg)
2600 
2601  handler = DBSXMLHandler(["run"])
2602  parser = xml.sax.make_parser()
2603  parser.setContentHandler(handler)
2604 
2605  try:
2606  xml.sax.parseString(api_result, handler)
2607  except SAXParseException:
2608  msg = "ERROR: Could not parse DBS server output"
2609  self.logger.fatal(msg)
2610  raise Error(msg)
2611 
2612  # DEBUG DEBUG DEBUG
2613  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2614  # DEBUG DEBUG DEBUG end
2615 
2616  runs = handler.results.values()[0]
2617  # Turn strings into integers.
2618  runs = [int(i) for i in runs]
2619  runs.sort()
2620 
2621  # End of dbs_resolve_runs.
2622  return runs
2623 
2624  ##########
2625 
2626  def dbs_resolve_globaltag(self, dataset_name):
2627  """Ask DBS for the globaltag corresponding to a given dataset.
2628 
2629  # BUG BUG BUG
2630  # This does not seem to work for data datasets? E.g. for
2631  # /Cosmics/Commissioning08_CRAFT0831X_V1_311_ReReco_FromSuperPointing_v1/RAW-RECO
2632  # Probaly due to the fact that the GlobalTag changed during
2633  # datataking...
2634  BUG BUG BUG end
2635 
2636  """
2637 
2638  # DEBUG DEBUG DEBUG
2639  # If we get here DBS should have been set up already.
2640  assert not self.dbs_api is None
2641  # DEBUG DEBUG DEBUG end
2642 
2643  api = self.dbs_api
2644  dbs_query = "find dataset.tag where dataset = %s " \
2645  "and dataset.status = VALID" % \
2646  dataset_name
2647  try:
2648  api_result = api.executeQuery(dbs_query)
2649  except DBSAPI.dbsApiException.DbsApiException:
2650  msg = "ERROR: Could not execute DBS query"
2651  self.logger.fatal(msg)
2652  raise Error(msg)
2653 
2654  handler = DBSXMLHandler(["dataset.tag"])
2655  parser = xml.sax.make_parser()
2656  parser.setContentHandler(parser)
2657 
2658  try:
2659  xml.sax.parseString(api_result, handler)
2660  except SAXParseException:
2661  msg = "ERROR: Could not parse DBS server output"
2662  self.logger.fatal(msg)
2663  raise Error(msg)
2664 
2665  # DEBUG DEBUG DEBUG
2666  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2667  # DEBUG DEBUG DEBUG end
2668 
2669  globaltag = handler.results.values()[0]
2670 
2671  # DEBUG DEBUG DEBUG
2672  assert len(globaltag) == 1
2673  # DEBUG DEBUG DEBUG end
2674 
2675  globaltag = globaltag[0]
2676 
2677  # End of dbs_resolve_globaltag.
2678  return globaltag
2679 
2680  ##########
2681 
2682  def dbs_resolve_datatype(self, dataset_name):
2683  """Ask DBS for the the data type (data or mc) of a given
2684  dataset.
2685 
2686  """
2687 
2688  # DEBUG DEBUG DEBUG
2689  # If we get here DBS should have been set up already.
2690  assert not self.dbs_api is None
2691  # DEBUG DEBUG DEBUG end
2692 
2693  api = self.dbs_api
2694  dbs_query = "find datatype.type where dataset = %s " \
2695  "and dataset.status = VALID" % \
2696  dataset_name
2697  try:
2698  api_result = api.executeQuery(dbs_query)
2699  except DBSAPI.dbsApiException.DbsApiException:
2700  msg = "ERROR: Could not execute DBS query"
2701  self.logger.fatal(msg)
2702  raise Error(msg)
2703 
2704  handler = DBSXMLHandler(["datatype.type"])
2705  parser = xml.sax.make_parser()
2706  parser.setContentHandler(handler)
2707 
2708  try:
2709  xml.sax.parseString(api_result, handler)
2710  except SAXParseException:
2711  msg = "ERROR: Could not parse DBS server output"
2712  self.logger.fatal(msg)
2713  raise Error(msg)
2714 
2715  # DEBUG DEBUG DEBUG
2716  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2717  # DEBUG DEBUG DEBUG end
2718 
2719  datatype = handler.results.values()[0]
2720 
2721  # DEBUG DEBUG DEBUG
2722  assert len(datatype) == 1
2723  # DEBUG DEBUG DEBUG end
2724 
2725  datatype = datatype[0]
2726 
2727  # End of dbs_resolve_datatype.
2728  return datatype
2729 
2730  ##########
2731 
2732  # OBSOLETE OBSOLETE OBSOLETE
2733  # This method is no longer used.
2734 
2735  def dbs_resolve_number_of_events(self, dataset_name, run_number=None):
2736  """Determine the number of events in a given dataset (and run).
2737 
2738  Ask DBS for the number of events in a dataset. If a run number
2739  is specified the number of events returned is that in that run
2740  of that dataset. If problems occur we throw an exception.
2741 
2742  # BUG BUG BUG
2743  # Since DBS does not return the number of events correctly,
2744  # neither for runs nor for whole datasets, we have to work
2745  # around that a bit...
2746  # BUG BUG BUG end
2747 
2748  """
2749 
2750  # DEBUG DEBUG DEBUG
2751  # If we get here DBS should have been set up already.
2752  assert not self.dbs_api is None
2753  # DEBUG DEBUG DEBUG end
2754 
2755  api = self.dbs_api
2756  dbs_query = "find file.name, file.numevents where dataset = %s " \
2757  "and dataset.status = VALID" % \
2758  dataset_name
2759  if not run_number is None:
2760  dbs_query = dbq_query + (" and run = %d" % run_number)
2761  try:
2762  api_result = api.executeQuery(dbs_query)
2763  except DBSAPI.dbsApiException.DbsApiException:
2764  msg = "ERROR: Could not execute DBS query"
2765  self.logger.fatal(msg)
2766  raise Error(msg)
2767 
2768  handler = DBSXMLHandler(["file.name", "file.numevents"])
2769  parser = xml.sax.make_parser()
2770  parser.setContentHandler(handler)
2771 
2772  try:
2773  xml.sax.parseString(api_result, handler)
2774  except SAXParseException:
2775  msg = "ERROR: Could not parse DBS server output"
2776  self.logger.fatal(msg)
2777  raise Error(msg)
2778 
2779  # DEBUG DEBUG DEBUG
2780  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2781  # DEBUG DEBUG DEBUG end
2782 
2783  num_events = sum(handler.results["file.numevents"])
2784 
2785  # End of dbs_resolve_number_of_events.
2786  return num_events
2787 
2788  # OBSOLETE OBSOLETE OBSOLETE end
2789 
2790  ##########
2791 
2792 ## def dbs_resolve_dataset_number_of_sites(self, dataset_name):
2793 ## """Ask DBS across how many sites this dataset has been spread
2794 ## out.
2795 
2796 ## This is especially useful to check that we do not submit a job
2797 ## supposed to run on a complete sample that is not contained at
2798 ## a single site.
2799 
2800 ## """
2801 
2802 ## # DEBUG DEBUG DEBUG
2803 ## # If we get here DBS should have been set up already.
2804 ## assert not self.dbs_api is None
2805 ## # DEBUG DEBUG DEBUG end
2806 
2807 ## api = self.dbs_api
2808 ## dbs_query = "find count(site) where dataset = %s " \
2809 ## "and dataset.status = VALID" % \
2810 ## dataset_name
2811 ## try:
2812 ## api_result = api.executeQuery(dbs_query)
2813 ## except DbsApiException:
2814 ## raise Error("ERROR: Could not execute DBS query")
2815 
2816 ## try:
2817 ## num_sites = []
2818 ## class Handler(xml.sax.handler.ContentHandler):
2819 ## def startElement(self, name, attrs):
2820 ## if name == "result":
2821 ## num_sites.append(str(attrs["COUNT_STORAGEELEMENT"]))
2822 ## xml.sax.parseString(api_result, Handler())
2823 ## except SAXParseException:
2824 ## raise Error("ERROR: Could not parse DBS server output")
2825 
2826 ## # DEBUG DEBUG DEBUG
2827 ## assert len(num_sites) == 1
2828 ## # DEBUG DEBUG DEBUG end
2829 
2830 ## num_sites = int(num_sites[0])
2831 
2832 ## # End of dbs_resolve_dataset_number_of_sites.
2833 ## return num_sites
2834 
2835  ##########
2836 
2837 ## def dbs_check_dataset_spread(self, dataset_name):
2838 ## """Figure out across how many sites this dataset is spread.
2839 
2840 ## NOTE: This is something we need to figure out per run, since
2841 ## we want to submit harvesting jobs per run.
2842 
2843 ## Basically three things can happen with a given dataset:
2844 ## - the whole dataset is available on a single site,
2845 ## - the whole dataset is available (mirrored) at multiple sites,
2846 ## - the dataset is spread across multiple sites and there is no
2847 ## single site containing the full dataset in one place.
2848 
2849 ## NOTE: If all goes well, it should not be possible that
2850 ## anything but a _full_ dataset is mirrored. So we ignore the
2851 ## possibility in which for example one site contains the full
2852 ## dataset and two others mirror half of it.
2853 ## ANOTHER NOTE: According to some people this last case _could_
2854 ## actually happen. I will not design for it, but make sure it
2855 ## ends up as a false negative, in which case we just loose some
2856 ## efficiency and treat the dataset (unnecessarily) as
2857 ## spread-out.
2858 
2859 ## We don't really care about the first two possibilities, but in
2860 ## the third case we need to make sure to run the harvesting in
2861 ## two-step mode.
2862 
2863 ## This method checks with DBS which of the above cases is true
2864 ## for the dataset name given, and returns a 1 for the first two
2865 ## cases, and the number of sites across which the dataset is
2866 ## spread for the third case.
2867 
2868 ## The way in which this is done is by asking how many files each
2869 ## site has for the dataset. In the first case there is only one
2870 ## site, in the second case all sites should have the same number
2871 ## of files (i.e. the total number of files in the dataset) and
2872 ## in the third case the file counts from all sites should add up
2873 ## to the total file count for the dataset.
2874 
2875 ## """
2876 
2877 ## # DEBUG DEBUG DEBUG
2878 ## # If we get here DBS should have been set up already.
2879 ## assert not self.dbs_api is None
2880 ## # DEBUG DEBUG DEBUG end
2881 
2882 ## api = self.dbs_api
2883 ## dbs_query = "find run, run.numevents, site, file.count " \
2884 ## "where dataset = %s " \
2885 ## "and dataset.status = VALID" % \
2886 ## dataset_name
2887 ## try:
2888 ## api_result = api.executeQuery(dbs_query)
2889 ## except DbsApiException:
2890 ## msg = "ERROR: Could not execute DBS query"
2891 ## self.logger.fatal(msg)
2892 ## raise Error(msg)
2893 
2894 ## # Index things by run number. No cross-check is done to make
2895 ## # sure we get results for each and every run in the
2896 ## # dataset. I'm not sure this would make sense since we'd be
2897 ## # cross-checking DBS info with DBS info anyway. Note that we
2898 ## # use the file count per site to see if we're dealing with an
2899 ## # incomplete vs. a mirrored dataset.
2900 ## sample_info = {}
2901 ## try:
2902 ## class Handler(xml.sax.handler.ContentHandler):
2903 ## def startElement(self, name, attrs):
2904 ## if name == "result":
2905 ## run_number = int(attrs["RUNS_RUNNUMBER"])
2906 ## site_name = str(attrs["STORAGEELEMENT_SENAME"])
2907 ## file_count = int(attrs["COUNT_FILES"])
2908 ## # BUG BUG BUG
2909 ## # Doh! For some reason DBS never returns any other
2910 ## # event count than zero.
2911 ## event_count = int(attrs["RUNS_NUMBEROFEVENTS"])
2912 ## # BUG BUG BUG end
2913 ## info = (site_name, file_count, event_count)
2914 ## try:
2915 ## sample_info[run_number].append(info)
2916 ## except KeyError:
2917 ## sample_info[run_number] = [info]
2918 ## xml.sax.parseString(api_result, Handler())
2919 ## except SAXParseException:
2920 ## msg = "ERROR: Could not parse DBS server output"
2921 ## self.logger.fatal(msg)
2922 ## raise Error(msg)
2923 
2924 ## # Now translate this into a slightly more usable mapping.
2925 ## sites = {}
2926 ## for (run_number, site_info) in sample_info.iteritems():
2927 ## # Quick-n-dirty trick to see if all file counts are the
2928 ## # same.
2929 ## unique_file_counts = set([i[1] for i in site_info])
2930 ## if len(unique_file_counts) == 1:
2931 ## # Okay, so this must be a mirrored dataset.
2932 ## # We have to pick one but we have to be careful. We
2933 ## # cannot submit to things like a T0, a T1, or CAF.
2934 ## site_names = [self.pick_a_site([i[0] for i in site_info])]
2935 ## nevents = [site_info[0][2]]
2936 ## else:
2937 ## # Looks like this is a spread-out sample.
2938 ## site_names = [i[0] for i in site_info]
2939 ## nevents = [i[2] for i in site_info]
2940 ## sites[run_number] = zip(site_names, nevents)
2941 
2942 ## self.logger.debug("Sample `%s' spread is:" % dataset_name)
2943 ## run_numbers = sites.keys()
2944 ## run_numbers.sort()
2945 ## for run_number in run_numbers:
2946 ## self.logger.debug(" run # %6d: %d sites (%s)" % \
2947 ## (run_number,
2948 ## len(sites[run_number]),
2949 ## ", ".join([i[0] for i in sites[run_number]])))
2950 
2951 ## # End of dbs_check_dataset_spread.
2952 ## return sites
2953 
2954 ## # DEBUG DEBUG DEBUG
2955 ## # Just kept for debugging now.
2956 ## def dbs_check_dataset_spread_old(self, dataset_name):
2957 ## """Figure out across how many sites this dataset is spread.
2958 
2959 ## NOTE: This is something we need to figure out per run, since
2960 ## we want to submit harvesting jobs per run.
2961 
2962 ## Basically three things can happen with a given dataset:
2963 ## - the whole dataset is available on a single site,
2964 ## - the whole dataset is available (mirrored) at multiple sites,
2965 ## - the dataset is spread across multiple sites and there is no
2966 ## single site containing the full dataset in one place.
2967 
2968 ## NOTE: If all goes well, it should not be possible that
2969 ## anything but a _full_ dataset is mirrored. So we ignore the
2970 ## possibility in which for example one site contains the full
2971 ## dataset and two others mirror half of it.
2972 ## ANOTHER NOTE: According to some people this last case _could_
2973 ## actually happen. I will not design for it, but make sure it
2974 ## ends up as a false negative, in which case we just loose some
2975 ## efficiency and treat the dataset (unnecessarily) as
2976 ## spread-out.
2977 
2978 ## We don't really care about the first two possibilities, but in
2979 ## the third case we need to make sure to run the harvesting in
2980 ## two-step mode.
2981 
2982 ## This method checks with DBS which of the above cases is true
2983 ## for the dataset name given, and returns a 1 for the first two
2984 ## cases, and the number of sites across which the dataset is
2985 ## spread for the third case.
2986 
2987 ## The way in which this is done is by asking how many files each
2988 ## site has for the dataset. In the first case there is only one
2989 ## site, in the second case all sites should have the same number
2990 ## of files (i.e. the total number of files in the dataset) and
2991 ## in the third case the file counts from all sites should add up
2992 ## to the total file count for the dataset.
2993 
2994 ## """
2995 
2996 ## # DEBUG DEBUG DEBUG
2997 ## # If we get here DBS should have been set up already.
2998 ## assert not self.dbs_api is None
2999 ## # DEBUG DEBUG DEBUG end
3000 
3001 ## api = self.dbs_api
3002 ## dbs_query = "find run, run.numevents, site, file.count " \
3003 ## "where dataset = %s " \
3004 ## "and dataset.status = VALID" % \
3005 ## dataset_name
3006 ## try:
3007 ## api_result = api.executeQuery(dbs_query)
3008 ## except DbsApiException:
3009 ## msg = "ERROR: Could not execute DBS query"
3010 ## self.logger.fatal(msg)
3011 ## raise Error(msg)
3012 
3013 ## # Index things by run number. No cross-check is done to make
3014 ## # sure we get results for each and every run in the
3015 ## # dataset. I'm not sure this would make sense since we'd be
3016 ## # cross-checking DBS info with DBS info anyway. Note that we
3017 ## # use the file count per site to see if we're dealing with an
3018 ## # incomplete vs. a mirrored dataset.
3019 ## sample_info = {}
3020 ## try:
3021 ## class Handler(xml.sax.handler.ContentHandler):
3022 ## def startElement(self, name, attrs):
3023 ## if name == "result":
3024 ## run_number = int(attrs["RUNS_RUNNUMBER"])
3025 ## site_name = str(attrs["STORAGEELEMENT_SENAME"])
3026 ## file_count = int(attrs["COUNT_FILES"])
3027 ## # BUG BUG BUG
3028 ## # Doh! For some reason DBS never returns any other
3029 ## # event count than zero.
3030 ## event_count = int(attrs["RUNS_NUMBEROFEVENTS"])
3031 ## # BUG BUG BUG end
3032 ## info = (site_name, file_count, event_count)
3033 ## try:
3034 ## sample_info[run_number].append(info)
3035 ## except KeyError:
3036 ## sample_info[run_number] = [info]
3037 ## xml.sax.parseString(api_result, Handler())
3038 ## except SAXParseException:
3039 ## msg = "ERROR: Could not parse DBS server output"
3040 ## self.logger.fatal(msg)
3041 ## raise Error(msg)
3042 
3043 ## # Now translate this into a slightly more usable mapping.
3044 ## sites = {}
3045 ## for (run_number, site_info) in sample_info.iteritems():
3046 ## # Quick-n-dirty trick to see if all file counts are the
3047 ## # same.
3048 ## unique_file_counts = set([i[1] for i in site_info])
3049 ## if len(unique_file_counts) == 1:
3050 ## # Okay, so this must be a mirrored dataset.
3051 ## # We have to pick one but we have to be careful. We
3052 ## # cannot submit to things like a T0, a T1, or CAF.
3053 ## site_names = [self.pick_a_site([i[0] for i in site_info])]
3054 ## nevents = [site_info[0][2]]
3055 ## else:
3056 ## # Looks like this is a spread-out sample.
3057 ## site_names = [i[0] for i in site_info]
3058 ## nevents = [i[2] for i in site_info]
3059 ## sites[run_number] = zip(site_names, nevents)
3060 
3061 ## self.logger.debug("Sample `%s' spread is:" % dataset_name)
3062 ## run_numbers = sites.keys()
3063 ## run_numbers.sort()
3064 ## for run_number in run_numbers:
3065 ## self.logger.debug(" run # %6d: %d site(s) (%s)" % \
3066 ## (run_number,
3067 ## len(sites[run_number]),
3068 ## ", ".join([i[0] for i in sites[run_number]])))
3069 
3070 ## # End of dbs_check_dataset_spread_old.
3071 ## return sites
3072 ## # DEBUG DEBUG DEBUG end
3073 
3074  ##########
3075 
3076  def dbs_check_dataset_spread(self, dataset_name):
3077  """Figure out the number of events in each run of this dataset.
3078 
3079  This is a more efficient way of doing this than calling
3080  dbs_resolve_number_of_events for each run.
3081 
3082  """
3083 
3084  self.logger.debug("Checking spread of dataset `%s'" % dataset_name)
3085 
3086  # DEBUG DEBUG DEBUG
3087  # If we get here DBS should have been set up already.
3088  assert not self.dbs_api is None
3089  # DEBUG DEBUG DEBUG end
3090 
3091  api = self.dbs_api
3092  dbs_query = "find run.number, site, file.name, file.numevents " \
3093  "where dataset = %s " \
3094  "and dataset.status = VALID" % \
3095  dataset_name
3096  try:
3097  api_result = api.executeQuery(dbs_query)
3098  except DBSAPI.dbsApiException.DbsApiException:
3099  msg = "ERROR: Could not execute DBS query"
3100  self.logger.fatal(msg)
3101  raise Error(msg)
3102 
3103  handler = DBSXMLHandler(["run.number", "site", "file.name", "file.numevents"])
3104  parser = xml.sax.make_parser()
3105  parser.setContentHandler(handler)
3106 
3107  try:
3108  # OBSOLETE OBSOLETE OBSOLETE
3109 ## class Handler(xml.sax.handler.ContentHandler):
3110 ## def startElement(self, name, attrs):
3111 ## if name == "result":
3112 ## site_name = str(attrs["STORAGEELEMENT_SENAME"])
3113 ## # TODO TODO TODO
3114 ## # Ugly hack to get around cases like this:
3115 ## # $ dbs search --query="find dataset, site, file.count where dataset=/RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO"
3116 ## # Using DBS instance at: http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet
3117 ## # Processing ... \
3118 ## # PATH STORAGEELEMENT_SENAME COUNT_FILES
3119 ## # _________________________________________________________________________________
3120 ## # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO 1
3121 ## # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO cmssrm.fnal.gov 12
3122 ## # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO srm-cms.cern.ch 12
3123 ## if len(site_name) < 1:
3124 ## return
3125 ## # TODO TODO TODO end
3126 ## run_number = int(attrs["RUNS_RUNNUMBER"])
3127 ## file_name = str(attrs["FILES_LOGICALFILENAME"])
3128 ## nevents = int(attrs["FILES_NUMBEROFEVENTS"])
3129 ## # I know, this is a bit of a kludge.
3130 ## if not files_info.has_key(run_number):
3131 ## # New run.
3132 ## files_info[run_number] = {}
3133 ## files_info[run_number][file_name] = (nevents,
3134 ## [site_name])
3135 ## elif not files_info[run_number].has_key(file_name):
3136 ## # New file for a known run.
3137 ## files_info[run_number][file_name] = (nevents,
3138 ## [site_name])
3139 ## else:
3140 ## # New entry for a known file for a known run.
3141 ## # DEBUG DEBUG DEBUG
3142 ## # Each file should have the same number of
3143 ## # events independent of the site it's at.
3144 ## assert nevents == files_info[run_number][file_name][0]
3145 ## # DEBUG DEBUG DEBUG end
3146 ## files_info[run_number][file_name][1].append(site_name)
3147  # OBSOLETE OBSOLETE OBSOLETE end
3148  xml.sax.parseString(api_result, handler)
3149  except SAXParseException:
3150  msg = "ERROR: Could not parse DBS server output"
3151  self.logger.fatal(msg)
3152  raise Error(msg)
3153 
3154  # DEBUG DEBUG DEBUG
3155  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
3156  # DEBUG DEBUG DEBUG end
3157 
3158  # Now reshuffle all results a bit so we can more easily use
3159  # them later on. (Remember that all arrays in the results
3160  # should have equal length.)
3161  files_info = {}
3162  for (index, site_name) in enumerate(handler.results["site"]):
3163  # Ugly hack to get around cases like this:
3164  # $ dbs search --query="find dataset, site, file.count where dataset=/RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO"
3165  # Using DBS instance at: http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet
3166  # Processing ... \
3167  # PATH STORAGEELEMENT_SENAME COUNT_FILES
3168  # _________________________________________________________________________________
3169  # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO 1
3170  # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO cmssrm.fnal.gov 12
3171  # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO srm-cms.cern.ch 12
3172  if len(site_name) < 1:
3173  continue
3174  run_number = int(handler.results["run.number"][index])
3175  file_name = handler.results["file.name"][index]
3176  nevents = int(handler.results["file.numevents"][index])
3177 
3178  # I know, this is a bit of a kludge.
3179  if not files_info.has_key(run_number):
3180  # New run.
3181  files_info[run_number] = {}
3182  files_info[run_number][file_name] = (nevents,
3183  [site_name])
3184  elif not files_info[run_number].has_key(file_name):
3185  # New file for a known run.
3186  files_info[run_number][file_name] = (nevents,
3187  [site_name])
3188  else:
3189  # New entry for a known file for a known run.
3190  # DEBUG DEBUG DEBUG
3191  # Each file should have the same number of
3192  # events independent of the site it's at.
3193  assert nevents == files_info[run_number][file_name][0]
3194  # DEBUG DEBUG DEBUG end
3195  files_info[run_number][file_name][1].append(site_name)
3196 
3197  # Remove any information for files that are not available
3198  # anywhere. NOTE: After introducing the ugly hack above, this
3199  # is a bit redundant, but let's keep it for the moment.
3200  for run_number in files_info.keys():
3201  files_without_sites = [i for (i, j) in \
3202  files_info[run_number].items() \
3203  if len(j[1]) < 1]
3204  if len(files_without_sites) > 0:
3205  self.logger.warning("Removing %d file(s)" \
3206  " with empty site names" % \
3207  len(files_without_sites))
3208  for file_name in files_without_sites:
3209  del files_info[run_number][file_name]
3210  # files_info[run_number][file_name] = (files_info \
3211  # [run_number] \
3212  # [file_name][0], [])
3213 
3214  # And another bit of a kludge.
3215  num_events_catalog = {}
3216  for run_number in files_info.keys():
3217  site_names = list(set([j for i in files_info[run_number].values() for j in i[1]]))
3218 
3219  # NOTE: The term `mirrored' does not have the usual
3220  # meaning here. It basically means that we can apply
3221  # single-step harvesting.
3222  mirrored = None
3223  if len(site_names) > 1:
3224  # Now we somehow need to figure out if we're dealing
3225  # with a mirrored or a spread-out dataset. The rule we
3226  # use here is that we're dealing with a spread-out
3227  # dataset unless we can find at least one site
3228  # containing exactly the full list of files for this
3229  # dataset that DBS knows about. In that case we just
3230  # use only that site.
3231  all_file_names = files_info[run_number].keys()
3232  all_file_names = set(all_file_names)
3233  sites_with_complete_copies = []
3234  for site_name in site_names:
3235  files_at_site = [i for (i, (j, k)) \
3236  in files_info[run_number].items() \
3237  if site_name in k]
3238  files_at_site = set(files_at_site)
3239  if files_at_site == all_file_names:
3240  sites_with_complete_copies.append(site_name)
3241  if len(sites_with_complete_copies) < 1:
3242  # This dataset/run is available at more than one
3243  # site, but no one has a complete copy. So this is
3244  # a spread-out sample.
3245  mirrored = False
3246  else:
3247  if len(sites_with_complete_copies) > 1:
3248  # This sample is available (and complete) at
3249  # more than one site. Definitely mirrored.
3250  mirrored = True
3251  else:
3252  # This dataset/run is available at more than
3253  # one site and at least one of them has a
3254  # complete copy. Even if this is only a single
3255  # site, let's call this `mirrored' and run the
3256  # single-step harvesting.
3257  mirrored = True
3258 
3259 ## site_names_ref = set(files_info[run_number].values()[0][1])
3260 ## for site_names_tmp in files_info[run_number].values()[1:]:
3261 ## if set(site_names_tmp[1]) != site_names_ref:
3262 ## mirrored = False
3263 ## break
3264 
3265  if mirrored:
3266  self.logger.debug(" -> run appears to be `mirrored'")
3267  else:
3268  self.logger.debug(" -> run appears to be spread-out")
3269 
3270  if mirrored and \
3271  len(sites_with_complete_copies) != len(site_names):
3272  # Remove any references to incomplete sites if we
3273  # have at least one complete site (and if there
3274  # are incomplete sites).
3275  for (file_name, (i, sites)) in files_info[run_number].items():
3276  complete_sites = [site for site in sites \
3277  if site in sites_with_complete_copies]
3278  files_info[run_number][file_name] = (i, complete_sites)
3279  site_names = sites_with_complete_copies
3280 
3281  self.logger.debug(" for run #%d:" % run_number)
3282  num_events_catalog[run_number] = {}
3283  num_events_catalog[run_number]["all_sites"] = sum([i[0] for i in files_info[run_number].values()])
3284  if len(site_names) < 1:
3285  self.logger.debug(" run is not available at any site")
3286  self.logger.debug(" (but should contain %d events" % \
3287  num_events_catalog[run_number]["all_sites"])
3288  else:
3289  self.logger.debug(" at all sites combined there are %d events" % \
3290  num_events_catalog[run_number]["all_sites"])
3291  for site_name in site_names:
3292  num_events_catalog[run_number][site_name] = sum([i[0] for i in files_info[run_number].values() if site_name in i[1]])
3293  self.logger.debug(" at site `%s' there are %d events" % \
3294  (site_name, num_events_catalog[run_number][site_name]))
3295  num_events_catalog[run_number]["mirrored"] = mirrored
3296 
3297  # End of dbs_check_dataset_spread.
3298  return num_events_catalog
3299 
3300  # Beginning of old version.
3301 ## def dbs_check_dataset_num_events(self, dataset_name):
3302 ## """Figure out the number of events in each run of this dataset.
3303 
3304 ## This is a more efficient way of doing this than calling
3305 ## dbs_resolve_number_of_events for each run.
3306 
3307 ## # BUG BUG BUG
3308 ## # This might very well not work at all for spread-out samples. (?)
3309 ## # BUG BUG BUG end
3310 
3311 ## """
3312 
3313 ## # DEBUG DEBUG DEBUG
3314 ## # If we get here DBS should have been set up already.
3315 ## assert not self.dbs_api is None
3316 ## # DEBUG DEBUG DEBUG end
3317 
3318 ## api = self.dbs_api
3319 ## dbs_query = "find run.number, file.name, file.numevents where dataset = %s " \
3320 ## "and dataset.status = VALID" % \
3321 ## dataset_name
3322 ## try:
3323 ## api_result = api.executeQuery(dbs_query)
3324 ## except DbsApiException:
3325 ## msg = "ERROR: Could not execute DBS query"
3326 ## self.logger.fatal(msg)
3327 ## raise Error(msg)
3328 
3329 ## try:
3330 ## files_info = {}
3331 ## class Handler(xml.sax.handler.ContentHandler):
3332 ## def startElement(self, name, attrs):
3333 ## if name == "result":
3334 ## run_number = int(attrs["RUNS_RUNNUMBER"])
3335 ## file_name = str(attrs["FILES_LOGICALFILENAME"])
3336 ## nevents = int(attrs["FILES_NUMBEROFEVENTS"])
3337 ## try:
3338 ## files_info[run_number][file_name] = nevents
3339 ## except KeyError:
3340 ## files_info[run_number] = {file_name: nevents}
3341 ## xml.sax.parseString(api_result, Handler())
3342 ## except SAXParseException:
3343 ## msg = "ERROR: Could not parse DBS server output"
3344 ## self.logger.fatal(msg)
3345 ## raise Error(msg)
3346 
3347 ## num_events_catalog = {}
3348 ## for run_number in files_info.keys():
3349 ## num_events_catalog[run_number] = sum(files_info[run_number].values())
3350 
3351 ## # End of dbs_check_dataset_num_events.
3352 ## return num_events_catalog
3353  # End of old version.
3354 
3355  ##########
3356 
3357  def build_dataset_list(self, input_method, input_name):
3358  """Build a list of all datasets to be processed.
3359 
3360  """
3361 
3362  dataset_names = []
3363 
3364  # It may be, but only for the list of datasets to ignore, that
3365  # the input method and name are None because nothing was
3366  # specified. In that case just an empty list is returned.
3367  if input_method is None:
3368  pass
3369  elif input_method == "dataset":
3370  # Input comes from a dataset name directly on the command
3371  # line. But, this can also contain wildcards so we need
3372  # DBS to translate it conclusively into a list of explicit
3373  # dataset names.
3374  self.logger.info("Asking DBS for dataset names")
3375  dataset_names = self.dbs_resolve_dataset_name(input_name)
3376  elif input_method == "datasetfile":
3377  # In this case a file containing a list of dataset names
3378  # is specified. Still, each line may contain wildcards so
3379  # this step also needs help from DBS.
3380  # NOTE: Lines starting with a `#' are ignored.
3381  self.logger.info("Reading input from list file `%s'" % \
3382  input_name)
3383  try:
3384  listfile = open("/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/bin/%s" %input_name, "r")
3385  print "open listfile"
3386  for dataset in listfile:
3387  # Skip empty lines.
3388  dataset_stripped = dataset.strip()
3389  if len(dataset_stripped) < 1:
3390  continue
3391  # Skip lines starting with a `#'.
3392  if dataset_stripped[0] != "#":
3393  dataset_names.extend(self. \
3394  dbs_resolve_dataset_name(dataset_stripped))
3395  listfile.close()
3396  except IOError:
3397  msg = "ERROR: Could not open input list file `%s'" % \
3398  input_name
3399  self.logger.fatal(msg)
3400  raise Error(msg)
3401  else:
3402  # DEBUG DEBUG DEBUG
3403  # We should never get here.
3404  assert False, "Unknown input method `%s'" % input_method
3405  # DEBUG DEBUG DEBUG end
3406 
3407  # Remove duplicates from the dataset list.
3408  # NOTE: There should not be any duplicates in any list coming
3409  # from DBS, but maybe the user provided a list file with less
3410  # care.
3411  dataset_names = list(set(dataset_names))
3412 
3413  # Store for later use.
3414  dataset_names.sort()
3415 
3416  # End of build_dataset_list.
3417  return dataset_names
3418 
3419  ##########
3420 
3422  """Build a list of datasets to process.
3423 
3424  """
3425 
3426  self.logger.info("Building list of datasets to consider...")
3427 
3428  input_method = self.input_method["datasets"]["use"]
3429  input_name = self.input_name["datasets"]["use"]
3430  dataset_names = self.build_dataset_list(input_method,
3431  input_name)
3432  self.datasets_to_use = dict(zip(dataset_names,
3433  [None] * len(dataset_names)))
3434 
3435  self.logger.info(" found %d dataset(s) to process:" % \
3436  len(dataset_names))
3437  for dataset in dataset_names:
3438  self.logger.info(" `%s'" % dataset)
3439 
3440  # End of build_dataset_use_list.
3441 
3442  ##########
3443 
3445  """Build a list of datasets to ignore.
3446 
3447  NOTE: We should always have a list of datasets to process, but
3448  it may be that we don't have a list of datasets to ignore.
3449 
3450  """
3451 
3452  self.logger.info("Building list of datasets to ignore...")
3453 
3454  input_method = self.input_method["datasets"]["ignore"]
3455  input_name = self.input_name["datasets"]["ignore"]
3456  dataset_names = self.build_dataset_list(input_method,
3457  input_name)
3458  self.datasets_to_ignore = dict(zip(dataset_names,
3459  [None] * len(dataset_names)))
3460 
3461  self.logger.info(" found %d dataset(s) to ignore:" % \
3462  len(dataset_names))
3463  for dataset in dataset_names:
3464  self.logger.info(" `%s'" % dataset)
3465 
3466  # End of build_dataset_ignore_list.
3467 
3468  ##########
3469 
3470  def build_runs_list(self, input_method, input_name):
3471 
3472  runs = []
3473 
3474  # A list of runs (either to use or to ignore) is not
3475  # required. This protects against `empty cases.'
3476  if input_method is None:
3477  pass
3478  elif input_method == "runs":
3479  # A list of runs was specified directly from the command
3480  # line.
3481  self.logger.info("Reading list of runs from the " \
3482  "command line")
3483  runs.extend([int(i.strip()) \
3484  for i in input_name.split(",") \
3485  if len(i.strip()) > 0])
3486  elif input_method == "runslistfile":
3487  # We were passed a file containing a list of runs.
3488  self.logger.info("Reading list of runs from file `%s'" % \
3489  input_name)
3490  try:
3491  listfile = open(input_name, "r")
3492  for run in listfile:
3493  # Skip empty lines.
3494  run_stripped = run.strip()
3495  if len(run_stripped) < 1:
3496  continue
3497  # Skip lines starting with a `#'.
3498  if run_stripped[0] != "#":
3499  runs.append(int(run_stripped))
3500  listfile.close()
3501  except IOError:
3502  msg = "ERROR: Could not open input list file `%s'" % \
3503  input_name
3504  self.logger.fatal(msg)
3505  raise Error(msg)
3506 
3507  else:
3508  # DEBUG DEBUG DEBUG
3509  # We should never get here.
3510  assert False, "Unknown input method `%s'" % input_method
3511  # DEBUG DEBUG DEBUG end
3512 
3513  # Remove duplicates, sort and done.
3514  runs = list(set(runs))
3515 
3516  # End of build_runs_list().
3517  return runs
3518 
3519  ##########
3520 
3522  """Build a list of runs to process.
3523 
3524  """
3525 
3526  self.logger.info("Building list of runs to consider...")
3527 
3528  input_method = self.input_method["runs"]["use"]
3529  input_name = self.input_name["runs"]["use"]
3530  runs = self.build_runs_list(input_method, input_name)
3531  self.runs_to_use = dict(zip(runs, [None] * len(runs)))
3532 
3533  self.logger.info(" found %d run(s) to process:" % \
3534  len(runs))
3535  if len(runs) > 0:
3536  self.logger.info(" %s" % ", ".join([str(i) for i in runs]))
3537 
3538  # End of build_runs_list().
3539 
3540  ##########
3541 
3543  """Build a list of runs to ignore.
3544 
3545  NOTE: We should always have a list of runs to process, but
3546  it may be that we don't have a list of runs to ignore.
3547 
3548  """
3549 
3550  self.logger.info("Building list of runs to ignore...")
3551 
3552  input_method = self.input_method["runs"]["ignore"]
3553  input_name = self.input_name["runs"]["ignore"]
3554  runs = self.build_runs_list(input_method, input_name)
3555  self.runs_to_ignore = dict(zip(runs, [None] * len(runs)))
3556 
3557  self.logger.info(" found %d run(s) to ignore:" % \
3558  len(runs))
3559  if len(runs) > 0:
3560  self.logger.info(" %s" % ", ".join([str(i) for i in runs]))
3561 
3562  # End of build_runs_ignore_list().
3563 
3564  ##########
3565 
3567  """Update the list of datasets taking into account the ones to
3568  ignore.
3569 
3570  Both lists have been generated before from DBS and both are
3571  assumed to be unique.
3572 
3573  NOTE: The advantage of creating the ignore list from DBS (in
3574  case a regexp is given) and matching that instead of directly
3575  matching the ignore criterion against the list of datasets (to
3576  consider) built from DBS is that in the former case we're sure
3577  that all regexps are treated exactly as DBS would have done
3578  without the cmsHarvester.
3579 
3580  NOTE: This only removes complete samples. Exclusion of single
3581  runs is done by the book keeping. So the assumption is that a
3582  user never wants to harvest just part (i.e. n out of N runs)
3583  of a sample.
3584 
3585  """
3586 
3587  self.logger.info("Processing list of datasets to ignore...")
3588 
3589  self.logger.debug("Before processing ignore list there are %d " \
3590  "datasets in the list to be processed" % \
3591  len(self.datasets_to_use))
3592 
3593  # Simple approach: just loop and search.
3594  dataset_names_filtered = copy.deepcopy(self.datasets_to_use)
3595  for dataset_name in self.datasets_to_use.keys():
3596  if dataset_name in self.datasets_to_ignore.keys():
3597  del dataset_names_filtered[dataset_name]
3598 
3599  self.logger.info(" --> Removed %d dataset(s)" % \
3600  (len(self.datasets_to_use) -
3601  len(dataset_names_filtered)))
3602 
3603  self.datasets_to_use = dataset_names_filtered
3604 
3605  self.logger.debug("After processing ignore list there are %d " \
3606  "datasets in the list to be processed" % \
3607  len(self.datasets_to_use))
3608 
3609  # End of process_dataset_ignore_list.
3610 
3611  ##########
3612 
3614 
3615  self.logger.info("Processing list of runs to use and ignore...")
3616 
3617  # This basically adds all runs in a dataset to be processed,
3618  # except for any runs that are not specified in the `to use'
3619  # list and any runs that are specified in the `to ignore'
3620  # list.
3621 
3622  # NOTE: It is assumed that those lists make sense. The input
3623  # should be checked against e.g. overlapping `use' and
3624  # `ignore' lists.
3625 
3626  runs_to_use = self.runs_to_use
3627  runs_to_ignore = self.runs_to_ignore
3628 
3629  for dataset_name in self.datasets_to_use:
3630  runs_in_dataset = self.datasets_information[dataset_name]["runs"]
3631 
3632  # First some sanity checks.
3633  runs_to_use_tmp = []
3634  for run in runs_to_use:
3635  if not run in runs_in_dataset:
3636  self.logger.warning("Dataset `%s' does not contain " \
3637  "requested run %d " \
3638  "--> ignoring `use' of this run" % \
3639  (dataset_name, run))
3640  else:
3641  runs_to_use_tmp.append(run)
3642 
3643  if len(runs_to_use) > 0:
3644  runs = runs_to_use_tmp
3645  self.logger.info("Using %d out of %d runs " \
3646  "of dataset `%s'" % \
3647  (len(runs), len(runs_in_dataset),
3648  dataset_name))
3649  else:
3650  runs = runs_in_dataset
3651 
3652  if len(runs_to_ignore) > 0:
3653  runs_tmp = []
3654  for run in runs:
3655  if not run in runs_to_ignore:
3656  runs_tmp.append(run)
3657  self.logger.info("Ignoring %d out of %d runs " \
3658  "of dataset `%s'" % \
3659  (len(runs)- len(runs_tmp),
3660  len(runs_in_dataset),
3661  dataset_name))
3662  runs = runs_tmp
3663 
3664  if self.todofile != "YourToDofile.txt":
3665  runs_todo = []
3666  print "Reading runs from file /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s" %self.todofile
3667  cmd="grep %s /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s | cut -f5 -d' '" %(dataset_name,self.todofile)
3668  (status, output)=commands.getstatusoutput(cmd)
3669  for run in runs:
3670  run_str="%s" %run
3671  if run_str in output:
3672  runs_todo.append(run)
3673  self.logger.info("Using %d runs " \
3674  "of dataset `%s'" % \
3675  (len(runs_todo),
3676  dataset_name))
3677  runs=runs_todo
3678 
3679  Json_runs = []
3680  if self.Jsonfilename != "YourJSON.txt":
3681  good_runs = []
3682  self.Jsonlumi = True
3683  # We were passed a Jsonfile containing a dictionary of
3684  # run/lunisection-pairs
3685  self.logger.info("Reading runs and lumisections from file `%s'" % \
3686  self.Jsonfilename)
3687  try:
3688  Jsonfile = open(self.Jsonfilename, "r")
3689  for names in Jsonfile:
3690  dictNames= eval(str(names))
3691  for key in dictNames:
3692  intkey=int(key)
3693  Json_runs.append(intkey)
3694  Jsonfile.close()
3695  except IOError:
3696  msg = "ERROR: Could not open Jsonfile `%s'" % \
3697  input_name
3698  self.logger.fatal(msg)
3699  raise Error(msg)
3700  for run in runs:
3701  if run in Json_runs:
3702  good_runs.append(run)
3703  self.logger.info("Using %d runs " \
3704  "of dataset `%s'" % \
3705  (len(good_runs),
3706  dataset_name))
3707  runs=good_runs
3708  if (self.Jsonrunfilename != "YourJSON.txt") and (self.Jsonfilename == "YourJSON.txt"):
3709  good_runs = []
3710  # We were passed a Jsonfile containing a dictionary of
3711  # run/lunisection-pairs
3712  self.logger.info("Reading runs from file `%s'" % \
3713  self.Jsonrunfilename)
3714  try:
3715  Jsonfile = open(self.Jsonrunfilename, "r")
3716  for names in Jsonfile:
3717  dictNames= eval(str(names))
3718  for key in dictNames:
3719  intkey=int(key)
3720  Json_runs.append(intkey)
3721  Jsonfile.close()
3722  except IOError:
3723  msg = "ERROR: Could not open Jsonfile `%s'" % \
3724  input_name
3725  self.logger.fatal(msg)
3726  raise Error(msg)
3727  for run in runs:
3728  if run in Json_runs:
3729  good_runs.append(run)
3730  self.logger.info("Using %d runs " \
3731  "of dataset `%s'" % \
3732  (len(good_runs),
3733  dataset_name))
3734  runs=good_runs
3735 
3736  self.datasets_to_use[dataset_name] = runs
3737 
3738  # End of process_runs_use_and_ignore_lists().
3739 
3740  ##########
3741 
3743  """Remove all but the largest part of all datasets.
3744 
3745  This allows us to harvest at least part of these datasets
3746  using single-step harvesting until the two-step approach
3747  works.
3748 
3749  """
3750 
3751  # DEBUG DEBUG DEBUG
3752  assert self.harvesting_mode == "single-step-allow-partial"
3753  # DEBUG DEBUG DEBUG end
3754 
3755  for dataset_name in self.datasets_to_use:
3756  for run_number in self.datasets_information[dataset_name]["runs"]:
3757  max_events = max(self.datasets_information[dataset_name]["sites"][run_number].values())
3758  sites_with_max_events = [i[0] for i in self.datasets_information[dataset_name]["sites"][run_number].items() if i[1] == max_events]
3759  self.logger.warning("Singlifying dataset `%s', " \
3760  "run %d" % \
3761  (dataset_name, run_number))
3762  cmssw_version = self.datasets_information[dataset_name] \
3763  ["cmssw_version"]
3764  selected_site = self.pick_a_site(sites_with_max_events,
3765  cmssw_version)
3766 
3767  # Let's tell the user that we're manhandling this dataset.
3768  nevents_old = self.datasets_information[dataset_name]["num_events"][run_number]
3769  self.logger.warning(" --> " \
3770  "only harvesting partial statistics: " \
3771  "%d out of %d events (5.1%f%%) " \
3772  "at site `%s'" % \
3773  (max_events,
3774  nevents_old,
3775  100. * max_events / nevents_old,
3776  selected_site))
3777  self.logger.warning("!!! Please note that the number of " \
3778  "events in the output path name will " \
3779  "NOT reflect the actual statistics in " \
3780  "the harvested results !!!")
3781 
3782  # We found the site with the highest statistics and
3783  # the corresponding number of events. (CRAB gets upset
3784  # if we ask for more events than there are at a given
3785  # site.) Now update this information in our main
3786  # datasets_information variable.
3787  self.datasets_information[dataset_name]["sites"][run_number] = {selected_site: max_events}
3788  self.datasets_information[dataset_name]["num_events"][run_number] = max_events
3789  #self.datasets_information[dataset_name]["sites"][run_number] = [selected_site]
3790 
3791  # End of singlify_datasets.
3792 
3793  ##########
3794 
3796  """Check list of dataset names for impossible ones.
3797 
3798  Two kinds of checks are done:
3799  - Checks for things that do not make sense. These lead to
3800  errors and skipped datasets.
3801  - Sanity checks. For these warnings are issued but the user is
3802  considered to be the authoritative expert.
3803 
3804  Checks performed:
3805  - The CMSSW version encoded in the dataset name should match
3806  self.cmssw_version. This is critical.
3807  - There should be some events in the dataset/run. This is
3808  critical in the sense that CRAB refuses to create jobs for
3809  zero events. And yes, this does happen in practice. E.g. the
3810  reprocessed CRAFT08 datasets contain runs with zero events.
3811  - A cursory check is performed to see if the harvesting type
3812  makes sense for the data type. This should prevent the user
3813  from inadvertently running RelVal for data.
3814  - It is not possible to run single-step harvesting jobs on
3815  samples that are not fully contained at a single site.
3816  - Each dataset/run has to be available at at least one site.
3817 
3818  """
3819 
3820  self.logger.info("Performing sanity checks on dataset list...")
3821 
3822  dataset_names_after_checks = copy.deepcopy(self.datasets_to_use)
3823 
3824  for dataset_name in self.datasets_to_use.keys():
3825 
3826  # Check CMSSW version.
3827  version_from_dataset = self.datasets_information[dataset_name] \
3828  ["cmssw_version"]
3829  if version_from_dataset != self.cmssw_version:
3830  msg = " CMSSW version mismatch for dataset `%s' " \
3831  "(%s vs. %s)" % \
3832  (dataset_name,
3833  self.cmssw_version, version_from_dataset)
3834  if self.force_running:
3835  # Expert mode: just warn, then continue.
3836  self.logger.warning("%s " \
3837  "--> `force mode' active: " \
3838  "run anyway" % msg)
3839  else:
3840  del dataset_names_after_checks[dataset_name]
3841  self.logger.warning("%s " \
3842  "--> skipping" % msg)
3843  continue
3844 
3845  ###
3846 
3847  # Check that the harvesting type makes sense for the
3848  # sample. E.g. normally one would not run the DQMOffline
3849  # harvesting on Monte Carlo.
3850  # TODO TODO TODO
3851  # This should be further refined.
3852  suspicious = False
3853  datatype = self.datasets_information[dataset_name]["datatype"]
3854  if datatype == "data":
3855  # Normally only DQM harvesting is run on data.
3856  if self.harvesting_type != "DQMOffline":
3857  suspicious = True
3858  elif datatype == "mc":
3859  if self.harvesting_type == "DQMOffline":
3860  suspicious = True
3861  else:
3862  # Doh!
3863  assert False, "ERROR Impossible data type `%s' " \
3864  "for dataset `%s'" % \
3865  (datatype, dataset_name)
3866  if suspicious:
3867  msg = " Normally one does not run `%s' harvesting " \
3868  "on %s samples, are you sure?" % \
3869  (self.harvesting_type, datatype)
3870  if self.force_running:
3871  self.logger.warning("%s " \
3872  "--> `force mode' active: " \
3873  "run anyway" % msg)
3874  else:
3875  del dataset_names_after_checks[dataset_name]
3876  self.logger.warning("%s " \
3877  "--> skipping" % msg)
3878  continue
3879 
3880  # TODO TODO TODO end
3881 
3882  ###
3883 
3884  # BUG BUG BUG
3885  # For the moment, due to a problem with DBS, I cannot
3886  # figure out the GlobalTag for data by myself. (For MC
3887  # it's no problem.) This means that unless a GlobalTag was
3888  # specified from the command line, we will have to skip
3889  # any data datasets.
3890 
3891  if datatype == "data":
3892  if self.globaltag is None:
3893  msg = "For data datasets (like `%s') " \
3894  "we need a GlobalTag" % \
3895  dataset_name
3896  del dataset_names_after_checks[dataset_name]
3897  self.logger.warning("%s " \
3898  "--> skipping" % msg)
3899  continue
3900 
3901  # BUG BUG BUG end
3902 
3903  ###
3904 
3905  # Check if the GlobalTag exists and (if we're using
3906  # reference histograms) if it's ready to be used with
3907  # reference histograms.
3908  globaltag = self.datasets_information[dataset_name]["globaltag"]
3909  if not globaltag in self.globaltag_check_cache:
3910  if self.check_globaltag(globaltag):
3911  self.globaltag_check_cache.append(globaltag)
3912  else:
3913  msg = "Something is wrong with GlobalTag `%s' " \
3914  "used by dataset `%s'!" % \
3915  (globaltag, dataset_name)
3916  if self.use_ref_hists:
3917  msg += "\n(Either it does not exist or it " \
3918  "does not contain the required key to " \
3919  "be used with reference histograms.)"
3920  else:
3921  msg += "\n(It probably just does not exist.)"
3922  self.logger.fatal(msg)
3923  raise Usage(msg)
3924 
3925  ###
3926 
3927  # Require that each run is available at least somewhere.
3928  runs_without_sites = [i for (i, j) in \
3929  self.datasets_information[dataset_name] \
3930  ["sites"].items() \
3931  if len(j) < 1 and \
3932  i in self.datasets_to_use[dataset_name]]
3933  if len(runs_without_sites) > 0:
3934  for run_without_sites in runs_without_sites:
3935  try:
3936  dataset_names_after_checks[dataset_name].remove(run_without_sites)
3937  except KeyError:
3938  pass
3939  self.logger.warning(" removed %d unavailable run(s) " \
3940  "from dataset `%s'" % \
3941  (len(runs_without_sites), dataset_name))
3942  self.logger.debug(" (%s)" % \
3943  ", ".join([str(i) for i in \
3944  runs_without_sites]))
3945 
3946  ###
3947 
3948  # Unless we're running two-step harvesting: only allow
3949  # samples located on a single site.
3950  if not self.harvesting_mode == "two-step":
3951  for run_number in self.datasets_to_use[dataset_name]:
3952  # DEBUG DEBUG DEBUG
3953 ## if self.datasets_information[dataset_name]["num_events"][run_number] != 0:
3954 ## pdb.set_trace()
3955  # DEBUG DEBUG DEBUG end
3956  num_sites = len(self.datasets_information[dataset_name] \
3957  ["sites"][run_number])
3958  if num_sites > 1 and \
3959  not self.datasets_information[dataset_name] \
3960  ["mirrored"][run_number]:
3961  # Cannot do this with a single-step job, not
3962  # even in force mode. It just does not make
3963  # sense.
3964  msg = " Dataset `%s', run %d is spread across more " \
3965  "than one site.\n" \
3966  " Cannot run single-step harvesting on " \
3967  "samples spread across multiple sites" % \
3968  (dataset_name, run_number)
3969  try:
3970  dataset_names_after_checks[dataset_name].remove(run_number)
3971  except KeyError:
3972  pass
3973  self.logger.warning("%s " \
3974  "--> skipping" % msg)
3975 
3976  ###
3977 
3978  # Require that the dataset/run is non-empty.
3979  # NOTE: To avoid reconsidering empty runs/datasets next
3980  # time around, we do include them in the book keeping.
3981  # BUG BUG BUG
3982  # This should sum only over the runs that we use!
3983  tmp = [j for (i, j) in self.datasets_information \
3984  [dataset_name]["num_events"].items() \
3985  if i in self.datasets_to_use[dataset_name]]
3986  num_events_dataset = sum(tmp)
3987  # BUG BUG BUG end
3988  if num_events_dataset < 1:
3989  msg = " dataset `%s' is empty" % dataset_name
3990  del dataset_names_after_checks[dataset_name]
3991  self.logger.warning("%s " \
3992  "--> skipping" % msg)
3993  # Update the book keeping with all the runs in the dataset.
3994  # DEBUG DEBUG DEBUG
3995  #assert set([j for (i, j) in self.datasets_information \
3996  # [dataset_name]["num_events"].items() \
3997  # if i in self.datasets_to_use[dataset_name]]) == \
3998  # set([0])
3999  # DEBUG DEBUG DEBUG end
4000  #self.book_keeping_information[dataset_name] = self.datasets_information \
4001  # [dataset_name]["num_events"]
4002  continue
4003 
4004  tmp = [i for i in \
4005  self.datasets_information[dataset_name] \
4006  ["num_events"].items() if i[1] < 1]
4007  tmp = [i for i in tmp if i[0] in self.datasets_to_use[dataset_name]]
4008  empty_runs = dict(tmp)
4009  if len(empty_runs) > 0:
4010  for empty_run in empty_runs:
4011  try:
4012  dataset_names_after_checks[dataset_name].remove(empty_run)
4013  except KeyError:
4014  pass
4015  self.logger.info(" removed %d empty run(s) from dataset `%s'" % \
4016  (len(empty_runs), dataset_name))
4017  self.logger.debug(" (%s)" % \
4018  ", ".join([str(i) for i in empty_runs]))
4019 
4020  ###
4021 
4022  # If we emptied out a complete dataset, remove the whole
4023  # thing.
4024  dataset_names_after_checks_tmp = copy.deepcopy(dataset_names_after_checks)
4025  for (dataset_name, runs) in dataset_names_after_checks.iteritems():
4026  if len(runs) < 1:
4027  self.logger.warning(" Removing dataset without any runs " \
4028  "(left) `%s'" % \
4029  dataset_name)
4030  del dataset_names_after_checks_tmp[dataset_name]
4031  dataset_names_after_checks = dataset_names_after_checks_tmp
4032 
4033  ###
4034 
4035  self.logger.warning(" --> Removed %d dataset(s)" % \
4036  (len(self.datasets_to_use) -
4037  len(dataset_names_after_checks)))
4038 
4039  # Now store the modified version of the dataset list.
4040  self.datasets_to_use = dataset_names_after_checks
4041 
4042  # End of check_dataset_list.
4043 
4044  ##########
4045 
4046  def escape_dataset_name(self, dataset_name):
4047  """Escape a DBS dataset name.
4048 
4049  Escape a DBS dataset name such that it does not cause trouble
4050  with the file system. This means turning each `/' into `__',
4051  except for the first one which is just removed.
4052 
4053  """
4054 
4055  escaped_dataset_name = dataset_name
4056  escaped_dataset_name = escaped_dataset_name.strip("/")
4057  escaped_dataset_name = escaped_dataset_name.replace("/", "__")
4058 
4059  return escaped_dataset_name
4060 
4061  ##########
4062 
4063  # BUG BUG BUG
4064  # This is a bit of a redundant method, isn't it?
4065  def create_config_file_name(self, dataset_name, run_number):
4066  """Generate the name of the configuration file to be run by
4067  CRAB.
4068 
4069  Depending on the harvesting mode (single-step or two-step)
4070  this is the name of the real harvesting configuration or the
4071  name of the first-step ME summary extraction configuration.
4072 
4073  """
4074 
4075  if self.harvesting_mode == "single-step":
4076  config_file_name = self.create_harvesting_config_file_name(dataset_name)
4077  elif self.harvesting_mode == "single-step-allow-partial":
4078  config_file_name = self.create_harvesting_config_file_name(dataset_name)
4079 ## # Only add the alarming piece to the file name if this is
4080 ## # a spread-out dataset.
4081 ## pdb.set_trace()
4082 ## if self.datasets_information[dataset_name] \
4083 ## ["mirrored"][run_number] == False:
4084 ## config_file_name = config_file_name.replace(".py", "_partial.py")
4085  elif self.harvesting_mode == "two-step":
4086  config_file_name = self.create_me_summary_config_file_name(dataset_name)
4087  else:
4088  assert False, "ERROR Unknown harvesting mode `%s'" % \
4089  self.harvesting_mode
4090 
4091  # End of create_config_file_name.
4092  return config_file_name
4093  # BUG BUG BUG end
4094 
4095  ##########
4096 
4097  def create_harvesting_config_file_name(self, dataset_name):
4098  "Generate the name to be used for the harvesting config file."
4099 
4100  file_name_base = "harvesting.py"
4101  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4102  config_file_name = file_name_base.replace(".py",
4103  "_%s.py" % \
4104  dataset_name_escaped)
4105 
4106  # End of create_harvesting_config_file_name.
4107  return config_file_name
4108 
4109  ##########
4110 
4111  def create_me_summary_config_file_name(self, dataset_name):
4112  "Generate the name of the ME summary extraction config file."
4113 
4114  file_name_base = "me_extraction.py"
4115  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4116  config_file_name = file_name_base.replace(".py",
4117  "_%s.py" % \
4118  dataset_name_escaped)
4119 
4120  # End of create_me_summary_config_file_name.
4121  return config_file_name
4122 
4123  ##########
4124 
4125  def create_output_file_name(self, dataset_name, run_number=None):
4126  """Create the name of the output file name to be used.
4127 
4128  This is the name of the output file of the `first step'. In
4129  the case of single-step harvesting this is already the final
4130  harvesting output ROOT file. In the case of two-step
4131  harvesting it is the name of the intermediary ME summary
4132  file.
4133 
4134  """
4135 
4136  # BUG BUG BUG
4137  # This method has become a bit of a mess. Originally it was
4138  # nice to have one entry point for both single- and two-step
4139  # output file names. However, now the former needs the run
4140  # number, while the latter does not even know about run
4141  # numbers. This should be fixed up a bit.
4142  # BUG BUG BUG end
4143 
4144  if self.harvesting_mode == "single-step":
4145  # DEBUG DEBUG DEBUG
4146  assert not run_number is None
4147  # DEBUG DEBUG DEBUG end
4148  output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4149  elif self.harvesting_mode == "single-step-allow-partial":
4150  # DEBUG DEBUG DEBUG
4151  assert not run_number is None
4152  # DEBUG DEBUG DEBUG end
4153  output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4154  elif self.harvesting_mode == "two-step":
4155  # DEBUG DEBUG DEBUG
4156  assert run_number is None
4157  # DEBUG DEBUG DEBUG end
4158  output_file_name = self.create_me_summary_output_file_name(dataset_name)
4159  else:
4160  # This should not be possible, but hey...
4161  assert False, "ERROR Unknown harvesting mode `%s'" % \
4162  self.harvesting_mode
4163 
4164  # End of create_harvesting_output_file_name.
4165  return output_file_name
4166 
4167  ##########
4168 
4169  def create_harvesting_output_file_name(self, dataset_name, run_number):
4170  """Generate the name to be used for the harvesting output file.
4171 
4172  This harvesting output file is the _final_ ROOT output file
4173  containing the harvesting results. In case of two-step
4174  harvesting there is an intermediate ME output file as well.
4175 
4176  """
4177 
4178  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4179 
4180  # Hmmm, looking at the code for the DQMFileSaver this might
4181  # actually be the place where the first part of this file
4182  # naming scheme comes from.
4183  # NOTE: It looks like the `V0001' comes from the DQM
4184  # version. This is something that cannot be looked up from
4185  # here, so let's hope it does not change too often.
4186  output_file_name = "DQM_V0001_R%09d__%s.root" % \
4187  (run_number, dataset_name_escaped)
4188  if self.harvesting_mode.find("partial") > -1:
4189  # Only add the alarming piece to the file name if this is
4190  # a spread-out dataset.
4191  if self.datasets_information[dataset_name] \
4192  ["mirrored"][run_number] == False:
4193  output_file_name = output_file_name.replace(".root", \
4194  "_partial.root")
4195 
4196  # End of create_harvesting_output_file_name.
4197  return output_file_name
4198 
4199  ##########
4200 
4201  def create_me_summary_output_file_name(self, dataset_name):
4202  """Generate the name of the intermediate ME file name to be
4203  used in two-step harvesting.
4204 
4205  """
4206 
4207  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4208  output_file_name = "me_summary_%s.root" % \
4209  dataset_name_escaped
4210 
4211  # End of create_me_summary_output_file_name.
4212  return output_file_name
4213 
4214  ##########
4215 
4216  def create_multicrab_block_name(self, dataset_name, run_number, index):
4217  """Create the block name to use for this dataset/run number.
4218 
4219  This is what appears in the brackets `[]' in multicrab.cfg. It
4220  is used as the name of the job and to create output
4221  directories.
4222 
4223  """
4224 
4225  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4226  block_name = "%s_%09d_%s" % (dataset_name_escaped, run_number, index)
4227 
4228  # End of create_multicrab_block_name.
4229  return block_name
4230 
4231  ##########
4232 
4234  """Create a CRAB configuration for a given job.
4235 
4236  NOTE: This is _not_ a complete (as in: submittable) CRAB
4237  configuration. It is used to store the common settings for the
4238  multicrab configuration.
4239 
4240  NOTE: Only CERN CASTOR area (/castor/cern.ch/) is supported.
4241 
4242  NOTE: According to CRAB, you `Must define exactly two of
4243  total_number_of_events, events_per_job, or
4244  number_of_jobs.'. For single-step harvesting we force one job,
4245  for the rest we don't really care.
4246 
4247  # BUG BUG BUG
4248  # With the current version of CRAB (2.6.1), in which Daniele
4249  # fixed the behaviour of no_block_boundary for me, one _has to
4250  # specify_ the total_number_of_events and one single site in
4251  # the se_white_list.
4252  # BUG BUG BUG end
4253 
4254  """
4255 
4256  tmp = []
4257 
4258  # This is the stuff we will need to fill in.
4259  castor_prefix = self.castor_prefix
4260 
4261  tmp.append(self.config_file_header())
4262  tmp.append("")
4263 
4264  ## CRAB
4265  ##------
4266  tmp.append("[CRAB]")
4267  tmp.append("jobtype = cmssw")
4268  tmp.append("")
4269 
4270  ## GRID
4271  ##------
4272  tmp.append("[GRID]")
4273  tmp.append("virtual_organization=cms")
4274  tmp.append("")
4275 
4276  ## USER
4277  ##------
4278  tmp.append("[USER]")
4279  tmp.append("copy_data = 1")
4280  tmp.append("")
4281 
4282  ## CMSSW
4283  ##-------
4284  tmp.append("[CMSSW]")
4285  tmp.append("# This reveals data hosted on T1 sites,")
4286  tmp.append("# which is normally hidden by CRAB.")
4287  tmp.append("show_prod = 1")
4288  tmp.append("number_of_jobs = 1")
4289  if self.Jsonlumi == True:
4290  tmp.append("lumi_mask = %s" % self.Jsonfilename)
4291  tmp.append("total_number_of_lumis = -1")
4292  else:
4293  if self.harvesting_type == "DQMOffline":
4294  tmp.append("total_number_of_lumis = -1")
4295  else:
4296  tmp.append("total_number_of_events = -1")
4297  if self.harvesting_mode.find("single-step") > -1:
4298  tmp.append("# Force everything to run in one job.")
4299  tmp.append("no_block_boundary = 1")
4300  tmp.append("")
4301 
4302  ## CAF
4303  ##-----
4304  tmp.append("[CAF]")
4305 
4306  crab_config = "\n".join(tmp)
4307 
4308  # End of create_crab_config.
4309  return crab_config
4310 
4311  ##########
4312 
4314  """Create a multicrab.cfg file for all samples.
4315 
4316  This creates the contents for a multicrab.cfg file that uses
4317  the crab.cfg file (generated elsewhere) for the basic settings
4318  and contains blocks for each run of each dataset.
4319 
4320  # BUG BUG BUG
4321  # The fact that it's necessary to specify the se_white_list
4322  # and the total_number_of_events is due to our use of CRAB
4323  # version 2.6.1. This should no longer be necessary in the
4324  # future.
4325  # BUG BUG BUG end
4326 
4327  """
4328 
4329  cmd="who i am | cut -f1 -d' '"
4330  (status, output)=commands.getstatusoutput(cmd)
4331  UserName = output
4332 
4333  if self.caf_access == True:
4334  print "Extracting %s as user name" %UserName
4335 
4336  number_max_sites = self.nr_max_sites + 1
4337 
4338  multicrab_config_lines = []
4339  multicrab_config_lines.append(self.config_file_header())
4340  multicrab_config_lines.append("")
4341  multicrab_config_lines.append("[MULTICRAB]")
4342  multicrab_config_lines.append("cfg = crab.cfg")
4343  multicrab_config_lines.append("")
4344 
4345  dataset_names = self.datasets_to_use.keys()
4346  dataset_names.sort()
4347 
4348  for dataset_name in dataset_names:
4349  runs = self.datasets_to_use[dataset_name]
4350  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4351  castor_prefix = self.castor_prefix
4352 
4353  for run in runs:
4354 
4355  # CASTOR output dir.
4356  castor_dir = self.datasets_information[dataset_name] \
4357  ["castor_path"][run]
4358 
4359  cmd = "rfdir %s" % castor_dir
4360  (status, output) = commands.getstatusoutput(cmd)
4361 
4362  if len(output) <= 0:
4363 
4364  # DEBUG DEBUG DEBUG
4365  # We should only get here if we're treating a
4366  # dataset/run that is fully contained at a single
4367  # site.
4368  assert (len(self.datasets_information[dataset_name] \
4369  ["sites"][run]) == 1) or \
4370  self.datasets_information[dataset_name]["mirrored"]
4371  # DEBUG DEBUG DEBUG end
4372 
4373  site_names = self.datasets_information[dataset_name] \
4374  ["sites"][run].keys()
4375 
4376  for i in range(1, number_max_sites, 1):
4377  if len(site_names) > 0:
4378  index = "site_%02d" % (i)
4379 
4380  config_file_name = self. \
4381  create_config_file_name(dataset_name, run)
4382  output_file_name = self. \
4383  create_output_file_name(dataset_name, run)
4384 
4385 
4386  # If we're looking at a mirrored dataset we just pick
4387  # one of the sites. Otherwise there is nothing to
4388  # choose.
4389 
4390  # Loop variable
4391  loop = 0
4392 
4393  if len(site_names) > 1:
4394  cmssw_version = self.datasets_information[dataset_name] \
4395  ["cmssw_version"]
4396  self.logger.info("Picking site for mirrored dataset " \
4397  "`%s', run %d" % \
4398  (dataset_name, run))
4399  site_name = self.pick_a_site(site_names, cmssw_version)
4400  if site_name in site_names:
4401  site_names.remove(site_name)
4402 
4403  else:
4404  site_name = site_names[0]
4405  site_names.remove(site_name)
4406 
4407  if site_name is self.no_matching_site_found_str:
4408  if loop < 1:
4409  break
4410 
4411  nevents = self.datasets_information[dataset_name]["num_events"][run]
4412 
4413  # The block name.
4414  multicrab_block_name = self.create_multicrab_block_name( \
4415  dataset_name, run, index)
4416  multicrab_config_lines.append("[%s]" % \
4417  multicrab_block_name)
4418 
4419  ## CRAB
4420  ##------
4421  if site_name == "caf.cern.ch":
4422  multicrab_config_lines.append("CRAB.use_server=0")
4423  multicrab_config_lines.append("CRAB.scheduler=caf")
4424  else:
4425  multicrab_config_lines.append("scheduler = glite")
4426 
4427  ## GRID
4428  ##------
4429  if site_name == "caf.cern.ch":
4430  pass
4431  else:
4432  multicrab_config_lines.append("GRID.se_white_list = %s" % \
4433  site_name)
4434  multicrab_config_lines.append("# This removes the default blacklisting of T1 sites.")
4435  multicrab_config_lines.append("GRID.remove_default_blacklist = 1")
4436  multicrab_config_lines.append("GRID.rb = CERN")
4437  if not self.non_t1access:
4438  multicrab_config_lines.append("GRID.role = t1access")
4439 
4440  ## USER
4441  ##------
4442 
4443  castor_dir = castor_dir.replace(castor_prefix, "")
4444  multicrab_config_lines.append("USER.storage_element=srm-cms.cern.ch")
4445  multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4446  castor_dir)
4447  multicrab_config_lines.append("USER.check_user_remote_dir=0")
4448 
4449  if site_name == "caf.cern.ch":
4450  multicrab_config_lines.append("USER.storage_path=%s" % castor_prefix)
4451  #multicrab_config_lines.append("USER.storage_element=T2_CH_CAF")
4452  #castor_dir = castor_dir.replace("/cms/store/caf/user/%s" %UserName, "")
4453  #multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4454  # castor_dir)
4455  else:
4456  multicrab_config_lines.append("USER.storage_path=/srm/managerv2?SFN=%s" % castor_prefix)
4457  #multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4458  # castor_dir)
4459  #multicrab_config_lines.append("USER.storage_element=srm-cms.cern.ch")
4460 
4461  ## CMSSW
4462  ##-------
4463  multicrab_config_lines.append("CMSSW.pset = %s" % \
4464  config_file_name)
4465  multicrab_config_lines.append("CMSSW.datasetpath = %s" % \
4466  dataset_name)
4467  multicrab_config_lines.append("CMSSW.runselection = %d" % \
4468  run)
4469 
4470  if self.Jsonlumi == True:
4471  pass
4472  else:
4473  if self.harvesting_type == "DQMOffline":
4474  pass
4475  else:
4476  multicrab_config_lines.append("CMSSW.total_number_of_events = %d" % \
4477  nevents)
4478  # The output file name.
4479  multicrab_config_lines.append("CMSSW.output_file = %s" % \
4480  output_file_name)
4481 
4482  ## CAF
4483  ##-----
4484  if site_name == "caf.cern.ch":
4485  multicrab_config_lines.append("CAF.queue=cmscaf1nd")
4486 
4487 
4488  # End of block.
4489  multicrab_config_lines.append("")
4490 
4491  loop = loop + 1
4492 
4493  self.all_sites_found = True
4494 
4495  multicrab_config = "\n".join(multicrab_config_lines)
4496 
4497  # End of create_multicrab_config.
4498  return multicrab_config
4499 
4500  ##########
4501 
4502  def check_globaltag(self, globaltag=None):
4503  """Check if globaltag exists.
4504 
4505  Check if globaltag exists as GlobalTag in the database given
4506  by self.frontier_connection_name['globaltag']. If globaltag is
4507  None, self.globaltag is used instead.
4508 
4509  If we're going to use reference histograms this method also
4510  checks for the existence of the required key in the GlobalTag.
4511 
4512  """
4513 
4514  if globaltag is None:
4515  globaltag = self.globaltag
4516 
4517  # All GlobalTags should end in `::All', right?
4518  if globaltag.endswith("::All"):
4519  globaltag = globaltag[:-5]
4520 
4521  connect_name = self.frontier_connection_name["globaltag"]
4522  # BUG BUG BUG
4523  # There is a bug in cmscond_tagtree_list: some magic is
4524  # missing from the implementation requiring one to specify
4525  # explicitly the name of the squid to connect to. Since the
4526  # cmsHarvester can only be run from the CERN network anyway,
4527  # cmsfrontier:8000 is hard-coded in here. Not nice but it
4528  # works.
4529  connect_name = connect_name.replace("frontier://",
4530  "frontier://cmsfrontier:8000/")
4531  # BUG BUG BUG end
4532  connect_name += self.db_account_name_cms_cond_globaltag()
4533 
4534  tag_exists = self.check_globaltag_exists(globaltag, connect_name)
4535 
4536  #----------
4537 
4538  tag_contains_ref_hist_key = False
4539  if self.use_ref_hists and tag_exists:
4540  # Check for the key required to use reference histograms.
4541  tag_contains_ref_hist_key = self.check_globaltag_contains_ref_hist_key(globaltag, connect_name)
4542 
4543  #----------
4544 
4545  if self.use_ref_hists:
4546  ret_val = tag_exists and tag_contains_ref_hist_key
4547  else:
4548  ret_val = tag_exists
4549 
4550  #----------
4551 
4552  # End of check_globaltag.
4553  return ret_val
4554 
4555  ##########
4556 
4557  def check_globaltag_exists(self, globaltag, connect_name):
4558  """Check if globaltag exists.
4559 
4560  """
4561 
4562  self.logger.info("Checking existence of GlobalTag `%s'" % \
4563  globaltag)
4564  self.logger.debug(" (Using database connection `%s')" % \
4565  connect_name)
4566 
4567  cmd = "cmscond_tagtree_list -c %s -T %s" % \
4568  (connect_name, globaltag)
4569  (status, output) = commands.getstatusoutput(cmd)
4570  if status != 0 or \
4571  output.find("error") > -1:
4572  msg = "Could not check existence of GlobalTag `%s' in `%s'" % \
4573  (globaltag, connect_name)
4574  if output.find(".ALL_TABLES not found") > -1:
4575  msg = "%s\n" \
4576  "Missing database account `%s'" % \
4577  (msg, output.split(".ALL_TABLES")[0].split()[-1])
4578  self.logger.fatal(msg)
4579  self.logger.debug("Command used:")
4580  self.logger.debug(" %s" % cmd)
4581  self.logger.debug("Output received:")
4582  self.logger.debug(output)
4583  raise Error(msg)
4584  if output.find("does not exist") > -1:
4585  self.logger.debug("GlobalTag `%s' does not exist in `%s':" % \
4586  (globaltag, connect_name))
4587  self.logger.debug("Output received:")
4588  self.logger.debug(output)
4589  tag_exists = False
4590  else:
4591  tag_exists = True
4592  self.logger.info(" GlobalTag exists? -> %s" % tag_exists)
4593 
4594  # End of check_globaltag_exists.
4595  return tag_exists
4596 
4597  ##########
4598 
4599  def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name):
4600  """Check if globaltag contains the required RefHistos key.
4601 
4602  """
4603 
4604  # Check for the key required to use reference histograms.
4605  tag_contains_key = None
4606  ref_hist_key = "RefHistos"
4607  self.logger.info("Checking existence of reference " \
4608  "histogram key `%s' in GlobalTag `%s'" % \
4609  (ref_hist_key, globaltag))
4610  self.logger.debug(" (Using database connection `%s')" % \
4611  connect_name)
4612  cmd = "cmscond_tagtree_list -c %s -T %s -n %s" % \
4613  (connect_name, globaltag, ref_hist_key)
4614  (status, output) = commands.getstatusoutput(cmd)
4615  if status != 0 or \
4616  output.find("error") > -1:
4617  msg = "Could not check existence of key `%s'" % \
4618  (ref_hist_key, connect_name)
4619  self.logger.fatal(msg)
4620  self.logger.debug("Command used:")
4621  self.logger.debug(" %s" % cmd)
4622  self.logger.debug("Output received:")
4623  self.logger.debug(" %s" % output)
4624  raise Error(msg)
4625  if len(output) < 1:
4626  self.logger.debug("Required key for use of reference " \
4627  "histograms `%s' does not exist " \
4628  "in GlobalTag `%s':" % \
4629  (ref_hist_key, globaltag))
4630  self.logger.debug("Output received:")
4631  self.logger.debug(output)
4632  tag_contains_key = False
4633  else:
4634  tag_contains_key = True
4635 
4636  self.logger.info(" GlobalTag contains `%s' key? -> %s" % \
4637  (ref_hist_key, tag_contains_key))
4638 
4639  # End of check_globaltag_contains_ref_hist_key.
4640  return tag_contains_key
4641 
4642  ##########
4643 
4644  def check_ref_hist_tag(self, tag_name):
4645  """Check the existence of tag_name in database connect_name.
4646 
4647  Check if tag_name exists as a reference histogram tag in the
4648  database given by self.frontier_connection_name['refhists'].
4649 
4650  """
4651 
4652  connect_name = self.frontier_connection_name["refhists"]
4653  connect_name += self.db_account_name_cms_cond_dqm_summary()
4654 
4655  self.logger.debug("Checking existence of reference " \
4656  "histogram tag `%s'" % \
4657  tag_name)
4658  self.logger.debug(" (Using database connection `%s')" % \
4659  connect_name)
4660 
4661  cmd = "cmscond_list_iov -c %s" % \
4662  connect_name
4663  (status, output) = commands.getstatusoutput(cmd)
4664  if status != 0:
4665  msg = "Could not check existence of tag `%s' in `%s'" % \
4666  (tag_name, connect_name)
4667  self.logger.fatal(msg)
4668  self.logger.debug("Command used:")
4669  self.logger.debug(" %s" % cmd)
4670  self.logger.debug("Output received:")
4671  self.logger.debug(output)
4672  raise Error(msg)
4673  if not tag_name in output.split():
4674  self.logger.debug("Reference histogram tag `%s' " \
4675  "does not exist in `%s'" % \
4676  (tag_name, connect_name))
4677  self.logger.debug(" Existing tags: `%s'" % \
4678  "', `".join(output.split()))
4679  tag_exists = False
4680  else:
4681  tag_exists = True
4682  self.logger.debug(" Reference histogram tag exists? " \
4683  "-> %s" % tag_exists)
4684 
4685  # End of check_ref_hist_tag.
4686  return tag_exists
4687 
4688  ##########
4689 
4690  def create_es_prefer_snippet(self, dataset_name):
4691  """Build the es_prefer snippet for the reference histograms.
4692 
4693  The building of the snippet is wrapped in some care-taking
4694  code that figures out the name of the reference histogram set
4695  and makes sure the corresponding tag exists.
4696 
4697  """
4698 
4699  # Figure out the name of the reference histograms tag.
4700  # NOTE: The existence of these tags has already been checked.
4701  ref_hist_tag_name = self.ref_hist_mappings[dataset_name]
4702 
4703  connect_name = self.frontier_connection_name["refhists"]
4704  connect_name += self.db_account_name_cms_cond_dqm_summary()
4705  record_name = "DQMReferenceHistogramRootFileRcd"
4706 
4707  # Build up the code snippet.
4708  code_lines = []
4709  code_lines.append("from CondCore.DBCommon.CondDBSetup_cfi import *")
4710  code_lines.append("process.ref_hist_source = cms.ESSource(\"PoolDBESSource\", CondDBSetup,")
4711  code_lines.append(" connect = cms.string(\"%s\")," % connect_name)
4712  code_lines.append(" toGet = cms.VPSet(cms.PSet(record = cms.string(\"%s\")," % record_name)
4713  code_lines.append(" tag = cms.string(\"%s\"))," % ref_hist_tag_name)
4714  code_lines.append(" )")
4715  code_lines.append(" )")
4716  code_lines.append("process.es_prefer_ref_hist_source = cms.ESPrefer(\"PoolDBESSource\", \"ref_hist_source\")")
4717 
4718  snippet = "\n".join(code_lines)
4719 
4720  # End of create_es_prefer_snippet.
4721  return snippet
4722 
4723  ##########
4724 
4725  def create_harvesting_config(self, dataset_name):
4726  """Create the Python harvesting configuration for harvesting.
4727 
4728  The basic configuration is created by
4729  Configuration.PyReleaseValidation.ConfigBuilder. (This mimics
4730  what cmsDriver.py does.) After that we add some specials
4731  ourselves.
4732 
4733  NOTE: On one hand it may not be nice to circumvent
4734  cmsDriver.py, on the other hand cmsDriver.py does not really
4735  do anything itself. All the real work is done by the
4736  ConfigBuilder so there is not much risk that we miss out on
4737  essential developments of cmsDriver in the future.
4738 
4739  """
4740 
4741  # Setup some options needed by the ConfigBuilder.
4742  config_options = defaultOptions
4743 
4744  # These are fixed for all kinds of harvesting jobs. Some of
4745  # them are not needed for the harvesting config, but to keep
4746  # the ConfigBuilder happy.
4747  config_options.name = "harvesting"
4748  config_options.scenario = "pp"
4749  config_options.number = 1
4750  config_options.arguments = self.ident_string()
4751  config_options.evt_type = config_options.name
4752  config_options.customisation_file = None
4753  config_options.filein = "dummy_value"
4754  config_options.filetype = "EDM"
4755  # This seems to be new in CMSSW 3.3.X, no clue what it does.
4756  config_options.gflash = "dummy_value"
4757  # This seems to be new in CMSSW 3.3.0.pre6, no clue what it
4758  # does.
4759  #config_options.himix = "dummy_value"
4760  config_options.dbsquery = ""
4761 
4762  ###
4763 
4764  # These options depend on the type of harvesting we're doing
4765  # and are stored in self.harvesting_info.
4766 
4767  config_options.step = "HARVESTING:%s" % \
4768  self.harvesting_info[self.harvesting_type] \
4769  ["step_string"]
4770  config_options.beamspot = self.harvesting_info[self.harvesting_type] \
4771  ["beamspot"]
4772  config_options.eventcontent = self.harvesting_info \
4773  [self.harvesting_type] \
4774  ["eventcontent"]
4775  config_options.harvesting = self.harvesting_info \
4776  [self.harvesting_type] \
4777  ["harvesting"]
4778 
4779  ###
4780 
4781  # This one is required (see also above) for each dataset.
4782 
4783  datatype = self.datasets_information[dataset_name]["datatype"]
4784  config_options.isMC = (datatype.lower() == "mc")
4785  config_options.isData = (datatype.lower() == "data")
4786  globaltag = self.datasets_information[dataset_name]["globaltag"]
4787 
4788  config_options.conditions = self.format_conditions_string(globaltag)
4789 
4790  ###
4791 
4792  if "with_input" in getargspec(ConfigBuilder.__init__)[0]:
4793  # This is the case for 3.3.X.
4794  config_builder = ConfigBuilder(config_options, with_input=True)
4795  else:
4796  # This is the case in older CMSSW versions.
4797  config_builder = ConfigBuilder(config_options)
4798  config_builder.prepare(True)
4799  config_contents = config_builder.pythonCfgCode
4800 
4801  ###
4802 
4803  # Add our signature to the top of the configuration. and add
4804  # some markers to the head and the tail of the Python code
4805  # generated by the ConfigBuilder.
4806  marker_lines = []
4807  sep = "#" * 30
4808  marker_lines.append(sep)
4809  marker_lines.append("# Code between these markers was generated by")
4810  marker_lines.append("# Configuration.PyReleaseValidation." \
4811  "ConfigBuilder")
4812 
4813  marker_lines.append(sep)
4814  marker = "\n".join(marker_lines)
4815 
4816  tmp = [self.config_file_header()]
4817  tmp.append("")
4818  tmp.append(marker)
4819  tmp.append("")
4820  tmp.append(config_contents)
4821  tmp.append("")
4822  tmp.append(marker)
4823  tmp.append("")
4824  config_contents = "\n".join(tmp)
4825 
4826  ###
4827 
4828  # Now we add some stuff of our own.
4829  customisations = [""]
4830 
4831  customisations.append("# Now follow some customisations")
4832  customisations.append("")
4833  connect_name = self.frontier_connection_name["globaltag"]
4834  connect_name += self.db_account_name_cms_cond_globaltag()
4835  customisations.append("process.GlobalTag.connect = \"%s\"" % \
4836  connect_name)
4837 
4838 
4839  if self.saveByLumiSection == True:
4840  customisations.append("process.dqmSaver.saveByLumiSection = 1")
4841  ##
4842  ##
4843 
4844  customisations.append("")
4845 
4846  # About the reference histograms... For data there is only one
4847  # set of references and those are picked up automatically
4848  # based on the GlobalTag. For MC we have to do some more work
4849  # since the reference histograms to be used depend on the MC
4850  # sample at hand. In this case we glue in an es_prefer snippet
4851  # to pick up the references. We do this only for RelVals since
4852  # for MC there are no meaningful references so far.
4853 
4854  # NOTE: Due to the lack of meaningful references for
4855  # MC samples reference histograms are explicitly
4856  # switched off in this case.
4857 
4858  use_es_prefer = (self.harvesting_type == "RelVal")
4859  use_refs = use_es_prefer or \
4860  (not self.harvesting_type == "MC")
4861  # Allow global override.
4862  use_refs = use_refs and self.use_ref_hists
4863 
4864  if not use_refs:
4865  # Disable reference histograms explicitly. The histograms
4866  # are loaded by the dqmRefHistoRootFileGetter
4867  # EDAnalyzer. This analyzer can be run from several
4868  # sequences. Here we remove it from each sequence that
4869  # exists.
4870  customisations.append("print \"Not using reference histograms\"")
4871  customisations.append("if hasattr(process, \"dqmRefHistoRootFileGetter\"):")
4872  customisations.append(" for (sequence_name, sequence) in process.sequences.iteritems():")
4873  customisations.append(" if sequence.remove(process.dqmRefHistoRootFileGetter):")
4874  customisations.append(" print \"Removed process.dqmRefHistoRootFileGetter from sequence `%s'\" % \\")
4875  customisations.append(" sequence_name")
4876  customisations.append("process.dqmSaver.referenceHandling = \"skip\"")
4877  else:
4878  # This makes sure all reference histograms are saved to
4879  # the output ROOT file.
4880  customisations.append("process.dqmSaver.referenceHandling = \"all\"")
4881  if use_es_prefer:
4882  es_prefer_snippet = self.create_es_prefer_snippet(dataset_name)
4883  customisations.append(es_prefer_snippet)
4884 
4885  # Make sure we get the `workflow' correct. As far as I can see
4886  # this is only important for the output file name.
4887  workflow_name = dataset_name
4888  if self.harvesting_mode == "single-step-allow-partial":
4889  workflow_name += "_partial"
4890  customisations.append("process.dqmSaver.workflow = \"%s\"" % \
4891  workflow_name)
4892 
4893  # BUG BUG BUG
4894  # This still does not work. The current two-step harvesting
4895  # efforts are on hold waiting for the solution to come from
4896  # elsewhere. (In this case the elsewhere is Daniele Spiga.)
4897 
4898 ## # In case this file is the second step (the real harvesting
4899 ## # step) of the two-step harvesting we have to tell it to use
4900 ## # our local files.
4901 ## if self.harvesting_mode == "two-step":
4902 ## castor_dir = self.datasets_information[dataset_name] \
4903 ## ["castor_path"][run]
4904 ## customisations.append("")
4905 ## customisations.append("# This is the second step (the real")
4906 ## customisations.append("# harvesting step) of a two-step")
4907 ## customisations.append("# harvesting procedure.")
4908 ## # BUG BUG BUG
4909 ## # To be removed in production version.
4910 ## customisations.append("import pdb")
4911 ## # BUG BUG BUG end
4912 ## customisations.append("import commands")
4913 ## customisations.append("import os")
4914 ## customisations.append("castor_dir = \"%s\"" % castor_dir)
4915 ## customisations.append("cmd = \"rfdir %s\" % castor_dir")
4916 ## customisations.append("(status, output) = commands.getstatusoutput(cmd)")
4917 ## customisations.append("if status != 0:")
4918 ## customisations.append(" print \"ERROR\"")
4919 ## customisations.append(" raise Exception, \"ERROR\"")
4920 ## customisations.append("file_names = [os.path.join(\"rfio:%s\" % path, i) for i in output.split() if i.startswith(\"EDM_summary\") and i.endswith(\".root\")]")
4921 ## #customisations.append("pdb.set_trace()")
4922 ## customisations.append("process.source.fileNames = cms.untracked.vstring(*file_names)")
4923 ## customisations.append("")
4924 
4925  # BUG BUG BUG end
4926 
4927  config_contents = config_contents + "\n".join(customisations)
4928 
4929  ###
4930 
4931  # End of create_harvesting_config.
4932  return config_contents
4933 
4934 ## ##########
4935 
4936 ## def create_harvesting_config_two_step(self, dataset_name):
4937 ## """Create the Python harvesting configuration for two-step
4938 ## harvesting.
4939 
4940 ## """
4941 
4942 ## # BUG BUG BUG
4943 ## config_contents = self.create_harvesting_config_single_step(dataset_name)
4944 ## # BUG BUG BUG end
4945 
4946 ## # End of create_harvesting_config_two_step.
4947 ## return config_contents
4948 
4949  ##########
4950 
4951  def create_me_extraction_config(self, dataset_name):
4952  """
4953 
4954  """
4955 
4956  # Big chunk of hard-coded Python. Not such a big deal since
4957  # this does not do much and is not likely to break.
4958  tmp = []
4959  tmp.append(self.config_file_header())
4960  tmp.append("")
4961  tmp.append("import FWCore.ParameterSet.Config as cms")
4962  tmp.append("")
4963  tmp.append("process = cms.Process(\"ME2EDM\")")
4964  tmp.append("")
4965  tmp.append("# Import of standard configurations")
4966  tmp.append("process.load(\"Configuration/EventContent/EventContent_cff\")")
4967  tmp.append("")
4968  tmp.append("# We don't really process any events, just keep this set to one to")
4969  tmp.append("# make sure things work.")
4970  tmp.append("process.maxEvents = cms.untracked.PSet(")
4971  tmp.append(" input = cms.untracked.int32(1)")
4972  tmp.append(" )")
4973  tmp.append("")
4974  tmp.append("process.options = cms.untracked.PSet(")
4975  tmp.append(" Rethrow = cms.untracked.vstring(\"ProductNotFound\")")
4976  tmp.append(" )")
4977  tmp.append("")
4978  tmp.append("process.source = cms.Source(\"PoolSource\",")
4979  tmp.append(" processingMode = \\")
4980  tmp.append(" cms.untracked.string(\"RunsAndLumis\"),")
4981  tmp.append(" fileNames = \\")
4982  tmp.append(" cms.untracked.vstring(\"no_file_specified\")")
4983  tmp.append(" )")
4984  tmp.append("")
4985  tmp.append("# Output definition: drop everything except for the monitoring.")
4986  tmp.append("process.output = cms.OutputModule(")
4987  tmp.append(" \"PoolOutputModule\",")
4988  tmp.append(" outputCommands = \\")
4989  tmp.append(" cms.untracked.vstring(\"drop *\", \\")
4990  tmp.append(" \"keep *_MEtoEDMConverter_*_*\"),")
4991  output_file_name = self. \
4992  create_output_file_name(dataset_name)
4993  tmp.append(" fileName = \\")
4994  tmp.append(" cms.untracked.string(\"%s\")," % output_file_name)
4995  tmp.append(" dataset = cms.untracked.PSet(")
4996  tmp.append(" dataTier = cms.untracked.string(\"RECO\"),")
4997  tmp.append(" filterName = cms.untracked.string(\"\")")
4998  tmp.append(" )")
4999  tmp.append(" )")
5000  tmp.append("")
5001  tmp.append("# Additional output definition")
5002  tmp.append("process.out_step = cms.EndPath(process.output)")
5003  tmp.append("")
5004  tmp.append("# Schedule definition")
5005  tmp.append("process.schedule = cms.Schedule(process.out_step)")
5006  tmp.append("")
5007 
5008  config_contents = "\n".join(tmp)
5009 
5010  # End of create_me_extraction_config.
5011  return config_contents
5012 
5013  ##########
5014 
5015 ## def create_harvesting_config(self, dataset_name):
5016 ## """Create the Python harvesting configuration for a given job.
5017 
5018 ## NOTE: The reason to have a single harvesting configuration per
5019 ## sample is to be able to specify the GlobalTag corresponding to
5020 ## each sample. Since it has been decided that (apart from the
5021 ## prompt reco) datasets cannot contain runs with different
5022 ## GlobalTags, we don't need a harvesting config per run.
5023 
5024 ## NOTE: This is the place where we distinguish between
5025 ## single-step and two-step harvesting modes (at least for the
5026 ## Python job configuration).
5027 
5028 ## """
5029 
5030 ## ###
5031 
5032 ## if self.harvesting_mode == "single-step":
5033 ## config_contents = self.create_harvesting_config_single_step(dataset_name)
5034 ## elif self.harvesting_mode == "two-step":
5035 ## config_contents = self.create_harvesting_config_two_step(dataset_name)
5036 ## else:
5037 ## # Impossible harvesting mode, we should never get here.
5038 ## assert False, "ERROR: unknown harvesting mode `%s'" % \
5039 ## self.harvesting_mode
5040 
5041 ## ###
5042 
5043 ## # End of create_harvesting_config.
5044 ## return config_contents
5045 
5046  ##########
5047 
5049  """Write a CRAB job configuration Python file.
5050 
5051  """
5052 
5053  self.logger.info("Writing CRAB configuration...")
5054 
5055  file_name_base = "crab.cfg"
5056 
5057  # Create CRAB configuration.
5058  crab_contents = self.create_crab_config()
5059 
5060  # Write configuration to file.
5061  crab_file_name = file_name_base
5062  try:
5063  crab_file = file(crab_file_name, "w")
5064  crab_file.write(crab_contents)
5065  crab_file.close()
5066  except IOError:
5067  self.logger.fatal("Could not write " \
5068  "CRAB configuration to file `%s'" % \
5069  crab_file_name)
5070  raise Error("ERROR: Could not write to file `%s'!" % \
5071  crab_file_name)
5072 
5073  # End of write_crab_config.
5074 
5075  ##########
5076 
5078  """Write a multi-CRAB job configuration Python file.
5079 
5080  """
5081 
5082  self.logger.info("Writing multi-CRAB configuration...")
5083 
5084  file_name_base = "multicrab.cfg"
5085 
5086  # Create multi-CRAB configuration.
5087  multicrab_contents = self.create_multicrab_config()
5088 
5089  # Write configuration to file.
5090  multicrab_file_name = file_name_base
5091  try:
5092  multicrab_file = file(multicrab_file_name, "w")
5093  multicrab_file.write(multicrab_contents)
5094  multicrab_file.close()
5095  except IOError:
5096  self.logger.fatal("Could not write " \
5097  "multi-CRAB configuration to file `%s'" % \
5098  multicrab_file_name)
5099  raise Error("ERROR: Could not write to file `%s'!" % \
5100  multicrab_file_name)
5101 
5102  # End of write_multicrab_config.
5103 
5104  ##########
5105 
5106  def write_harvesting_config(self, dataset_name):
5107  """Write a harvesting job configuration Python file.
5108 
5109  NOTE: This knows nothing about single-step or two-step
5110  harvesting. That's all taken care of by
5111  create_harvesting_config.
5112 
5113  """
5114 
5115  self.logger.debug("Writing harvesting configuration for `%s'..." % \
5116  dataset_name)
5117 
5118  # Create Python configuration.
5119  config_contents = self.create_harvesting_config(dataset_name)
5120 
5121  # Write configuration to file.
5122  config_file_name = self. \
5124  try:
5125  config_file = file(config_file_name, "w")
5126  config_file.write(config_contents)
5127  config_file.close()
5128  except IOError:
5129  self.logger.fatal("Could not write " \
5130  "harvesting configuration to file `%s'" % \
5131  config_file_name)
5132  raise Error("ERROR: Could not write to file `%s'!" % \
5133  config_file_name)
5134 
5135  # End of write_harvesting_config.
5136 
5137  ##########
5138 
5139  def write_me_extraction_config(self, dataset_name):
5140  """Write an ME-extraction configuration Python file.
5141 
5142  This `ME-extraction' (ME = Monitoring Element) is the first
5143  step of the two-step harvesting.
5144 
5145  """
5146 
5147  self.logger.debug("Writing ME-extraction configuration for `%s'..." % \
5148  dataset_name)
5149 
5150  # Create Python configuration.
5151  config_contents = self.create_me_extraction_config(dataset_name)
5152 
5153  # Write configuration to file.
5154  config_file_name = self. \
5156  try:
5157  config_file = file(config_file_name, "w")
5158  config_file.write(config_contents)
5159  config_file.close()
5160  except IOError:
5161  self.logger.fatal("Could not write " \
5162  "ME-extraction configuration to file `%s'" % \
5163  config_file_name)
5164  raise Error("ERROR: Could not write to file `%s'!" % \
5165  config_file_name)
5166 
5167  # End of write_me_extraction_config.
5168 
5169  ##########
5170 
5171 
5172  def ref_hist_mappings_needed(self, dataset_name=None):
5173  """Check if we need to load and check the reference mappings.
5174 
5175  For data the reference histograms should be taken
5176  automatically from the GlobalTag, so we don't need any
5177  mappings. For RelVals we need to know a mapping to be used in
5178  the es_prefer code snippet (different references for each of
5179  the datasets.)
5180 
5181  WARNING: This implementation is a bit convoluted.
5182 
5183  """
5184 
5185  # If no dataset name given, do everything, otherwise check
5186  # only this one dataset.
5187  if not dataset_name is None:
5188  data_type = self.datasets_information[dataset_name] \
5189  ["datatype"]
5190  mappings_needed = (data_type == "mc")
5191  # DEBUG DEBUG DEBUG
5192  if not mappings_needed:
5193  assert data_type == "data"
5194  # DEBUG DEBUG DEBUG end
5195  else:
5196  tmp = [self.ref_hist_mappings_needed(dataset_name) \
5197  for dataset_name in \
5198  self.datasets_information.keys()]
5199  mappings_needed = (True in tmp)
5200 
5201  # End of ref_hist_mappings_needed.
5202  return mappings_needed
5203 
5204  ##########
5205 
5207  """Load the reference histogram mappings from file.
5208 
5209  The dataset name to reference histogram name mappings are read
5210  from a text file specified in self.ref_hist_mappings_file_name.
5211 
5212  """
5213 
5214  # DEBUG DEBUG DEBUG
5215  assert len(self.ref_hist_mappings) < 1, \
5216  "ERROR Should not be RE-loading " \
5217  "reference histogram mappings!"
5218  # DEBUG DEBUG DEBUG end
5219 
5220  self.logger.info("Loading reference histogram mappings " \
5221  "from file `%s'" % \
5222  self.ref_hist_mappings_file_name)
5223 
5224  mappings_lines = None
5225  try:
5226  mappings_file = file(self.ref_hist_mappings_file_name, "r")
5227  mappings_lines = mappings_file.readlines()
5228  mappings_file.close()
5229  except IOError:
5230  msg = "ERROR: Could not open reference histogram mapping "\
5231  "file `%s'" % self.ref_hist_mappings_file_name
5232  self.logger.fatal(msg)
5233  raise Error(msg)
5234 
5235  ##########
5236 
5237  # The format we expect is: two white-space separated pieces
5238  # per line. The first the dataset name for which the reference
5239  # should be used, the second one the name of the reference
5240  # histogram in the database.
5241 
5242  for mapping in mappings_lines:
5243  # Skip comment lines.
5244  if not mapping.startswith("#"):
5245  mapping = mapping.strip()
5246  if len(mapping) > 0:
5247  mapping_pieces = mapping.split()
5248  if len(mapping_pieces) != 2:
5249  msg = "ERROR: The reference histogram mapping " \
5250  "file contains a line I don't " \
5251  "understand:\n %s" % mapping
5252  self.logger.fatal(msg)
5253  raise Error(msg)
5254  dataset_name = mapping_pieces[0].strip()
5255  ref_hist_name = mapping_pieces[1].strip()
5256  # We don't want people to accidentally specify
5257  # multiple mappings for the same dataset. Just
5258  # don't accept those cases.
5259  if self.ref_hist_mappings.has_key(dataset_name):
5260  msg = "ERROR: The reference histogram mapping " \
5261  "file contains multiple mappings for " \
5262  "dataset `%s'."
5263  self.logger.fatal(msg)
5264  raise Error(msg)
5265 
5266  # All is well that ends well.
5267  self.ref_hist_mappings[dataset_name] = ref_hist_name
5268 
5269  ##########
5270 
5271  self.logger.info(" Successfully loaded %d mapping(s)" % \
5272  len(self.ref_hist_mappings))
5273  max_len = max([len(i) for i in self.ref_hist_mappings.keys()])
5274  for (map_from, map_to) in self.ref_hist_mappings.iteritems():
5275  self.logger.info(" %-*s -> %s" % \
5276  (max_len, map_from, map_to))
5277 
5278  # End of load_ref_hist_mappings.
5279 
5280  ##########
5281 
5283  """Make sure all necessary reference histograms exist.
5284 
5285  Check that for each of the datasets to be processed a
5286  reference histogram is specified and that that histogram
5287  exists in the database.
5288 
5289  NOTE: There's a little complication here. Since this whole
5290  thing was designed to allow (in principle) harvesting of both
5291  data and MC datasets in one go, we need to be careful to check
5292  the availability fof reference mappings only for those
5293  datasets that need it.
5294 
5295  """
5296 
5297  self.logger.info("Checking reference histogram mappings")
5298 
5299  for dataset_name in self.datasets_to_use:
5300  try:
5301  ref_hist_name = self.ref_hist_mappings[dataset_name]
5302  except KeyError:
5303  msg = "ERROR: No reference histogram mapping found " \
5304  "for dataset `%s'" % \
5305  dataset_name
5306  self.logger.fatal(msg)
5307  raise Error(msg)
5308 
5309  if not self.check_ref_hist_tag(ref_hist_name):
5310  msg = "Reference histogram tag `%s' " \
5311  "(used for dataset `%s') does not exist!" % \
5312  (ref_hist_name, dataset_name)
5313  self.logger.fatal(msg)
5314  raise Usage(msg)
5315 
5316  self.logger.info(" Done checking reference histogram mappings.")
5317 
5318  # End of check_ref_hist_mappings.
5319 
5320  ##########
5321 
5323  """Obtain all information on the datasets that we need to run.
5324 
5325  Use DBS to figure out all required information on our
5326  datasets, like the run numbers and the GlobalTag. All
5327  information is stored in the datasets_information member
5328  variable.
5329 
5330  """
5331 
5332  # Get a list of runs in the dataset.
5333  # NOTE: The harvesting has to be done run-by-run, so we
5334  # split up datasets based on the run numbers. Strictly
5335  # speaking this is not (yet?) necessary for Monte Carlo
5336  # since all those samples use run number 1. Still, this
5337  # general approach should work for all samples.
5338 
5339  # Now loop over all datasets in the list and process them.
5340  # NOTE: This processing has been split into several loops
5341  # to be easier to follow, sacrificing a bit of efficiency.
5342  self.datasets_information = {}
5343  self.logger.info("Collecting information for all datasets to process")
5344  dataset_names = self.datasets_to_use.keys()
5345  dataset_names.sort()
5346  for dataset_name in dataset_names:
5347 
5348  # Tell the user which dataset: nice with many datasets.
5349  sep_line = "-" * 30
5350  self.logger.info(sep_line)
5351  self.logger.info(" `%s'" % dataset_name)
5352  self.logger.info(sep_line)
5353 
5354  runs = self.dbs_resolve_runs(dataset_name)
5355  self.logger.info(" found %d run(s)" % len(runs))
5356  if len(runs) > 0:
5357  self.logger.debug(" run number(s): %s" % \
5358  ", ".join([str(i) for i in runs]))
5359  else:
5360  # DEBUG DEBUG DEBUG
5361  # This should never happen after the DBS checks.
5362  self.logger.warning(" --> skipping dataset "
5363  "without any runs")
5364  assert False, "Panic: found a dataset without runs " \
5365  "after DBS checks!"
5366  # DEBUG DEBUG DEBUG end
5367 
5368  cmssw_version = self.dbs_resolve_cmssw_version(dataset_name)
5369  self.logger.info(" found CMSSW version `%s'" % cmssw_version)
5370 
5371  # Figure out if this is data or MC.
5372  datatype = self.dbs_resolve_datatype(dataset_name)
5373  self.logger.info(" sample is data or MC? --> %s" % \
5374  datatype)
5375 
5376  ###
5377 
5378  # Try and figure out the GlobalTag to be used.
5379  if self.globaltag is None:
5380  globaltag = self.dbs_resolve_globaltag(dataset_name)
5381  else:
5382  globaltag = self.globaltag
5383 
5384  self.logger.info(" found GlobalTag `%s'" % globaltag)
5385 
5386  # DEBUG DEBUG DEBUG
5387  if globaltag == "":
5388  # Actually we should not even reach this point, after
5389  # our dataset sanity checks.
5390  assert datatype == "data", \
5391  "ERROR Empty GlobalTag for MC dataset!!!"
5392  # DEBUG DEBUG DEBUG end
5393 
5394  ###
5395 
5396  # DEBUG DEBUG DEBUG
5397  #tmp = self.dbs_check_dataset_spread_old(dataset_name)
5398  # DEBUG DEBUG DEBUG end
5399  sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5400 
5401  # Extract the total event counts.
5402  num_events = {}
5403  for run_number in sites_catalog.keys():
5404  num_events[run_number] = sites_catalog \
5405  [run_number]["all_sites"]
5406  del sites_catalog[run_number]["all_sites"]
5407 
5408  # Extract the information about whether or not datasets
5409  # are mirrored.
5410  mirror_catalog = {}
5411  for run_number in sites_catalog.keys():
5412  mirror_catalog[run_number] = sites_catalog \
5413  [run_number]["mirrored"]
5414  del sites_catalog[run_number]["mirrored"]
5415 
5416  # BUG BUG BUG
5417  # I think I could now get rid of that and just fill the
5418  # "sites" entry with the `inverse' of this
5419  # num_events_catalog(?).
5420  #num_sites = self.dbs_resolve_dataset_number_of_sites(dataset_name)
5421  #sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5422  #sites_catalog = dict(zip(num_events_catalog.keys(),
5423  # [[j for i in num_events_catalog.values() for j in i.keys()]]))
5424  # BUG BUG BUG end
5425 
5426 ## # DEBUG DEBUG DEBUG
5427 ## # This is probably only useful to make sure we don't muck
5428 ## # things up, right?
5429 ## # Figure out across how many sites this sample has been spread.
5430 ## if num_sites == 1:
5431 ## self.logger.info(" sample is contained at a single site")
5432 ## else:
5433 ## self.logger.info(" sample is spread across %d sites" % \
5434 ## num_sites)
5435 ## if num_sites < 1:
5436 ## # NOTE: This _should not_ happen with any valid dataset.
5437 ## self.logger.warning(" --> skipping dataset which is not " \
5438 ## "hosted anywhere")
5439 ## # DEBUG DEBUG DEBUG end
5440 
5441  # Now put everything in a place where we can find it again
5442  # if we need it.
5443  self.datasets_information[dataset_name] = {}
5444  self.datasets_information[dataset_name]["runs"] = runs
5445  self.datasets_information[dataset_name]["cmssw_version"] = \
5446  cmssw_version
5447  self.datasets_information[dataset_name]["globaltag"] = globaltag
5448  self.datasets_information[dataset_name]["datatype"] = datatype
5449  self.datasets_information[dataset_name]["num_events"] = num_events
5450  self.datasets_information[dataset_name]["mirrored"] = mirror_catalog
5451  self.datasets_information[dataset_name]["sites"] = sites_catalog
5452 
5453  # Each run of each dataset has a different CASTOR output
5454  # path.
5455  castor_path_common = self.create_castor_path_name_common(dataset_name)
5456  self.logger.info(" output will go into `%s'" % \
5457  castor_path_common)
5458 
5459  castor_paths = dict(zip(runs,
5460  [self.create_castor_path_name_special(dataset_name, i, castor_path_common) \
5461  for i in runs]))
5462  for path_name in castor_paths.values():
5463  self.logger.debug(" %s" % path_name)
5464  self.datasets_information[dataset_name]["castor_path"] = \
5465  castor_paths
5466 
5467  # End of build_datasets_information.
5468 
5469  ##########
5470 
5472  """Tell the user what to do now, after this part is done.
5473 
5474  This should provide the user with some (preferably
5475  copy-pasteable) instructions on what to do now with the setups
5476  and files that have been created.
5477 
5478  """
5479 
5480  # TODO TODO TODO
5481  # This could be improved a bit.
5482  # TODO TODO TODO end
5483 
5484  sep_line = "-" * 60
5485 
5486  self.logger.info("")
5487  self.logger.info(sep_line)
5488  self.logger.info(" Configuration files have been created.")
5489  self.logger.info(" From here on please follow the usual CRAB instructions.")
5490  self.logger.info(" Quick copy-paste instructions are shown below.")
5491  self.logger.info(sep_line)
5492 
5493  self.logger.info("")
5494  self.logger.info(" Create all CRAB jobs:")
5495  self.logger.info(" multicrab -create")
5496  self.logger.info("")
5497  self.logger.info(" Submit all CRAB jobs:")
5498  self.logger.info(" multicrab -submit")
5499  self.logger.info("")
5500  self.logger.info(" Check CRAB status:")
5501  self.logger.info(" multicrab -status")
5502  self.logger.info("")
5503 
5504  self.logger.info("")
5505  self.logger.info(" For more information please see the CMS Twiki:")
5506  self.logger.info(" %s" % twiki_url)
5507  self.logger.info(sep_line)
5508 
5509  # If there were any jobs for which we could not find a
5510  # matching site show a warning message about that.
5511  if not self.all_sites_found:
5512  self.logger.warning(" For some of the jobs no matching " \
5513  "site could be found")
5514  self.logger.warning(" --> please scan your multicrab.cfg" \
5515  "for occurrences of `%s'." % \
5516  self.no_matching_site_found_str)
5517  self.logger.warning(" You will have to fix those " \
5518  "by hand, sorry.")
5519 
5520  # End of show_exit_message.
5521 
5522  ##########
5523 
5524  def run(self):
5525  "Main entry point of the CMS harvester."
5526 
5527  # Start with a positive thought.
5528  exit_code = 0
5529 
5530  try:
5531 
5532  try:
5533 
5534  # Parse all command line options and arguments
5535  self.parse_cmd_line_options()
5536  # and check that they make sense.
5537  self.check_input_status()
5538 
5539  # Check if CMSSW is setup.
5540  self.check_cmssw()
5541 
5542  # Check if DBS is setup,
5543  self.check_dbs()
5544  # and if all is fine setup the Python side.
5545  self.setup_dbs()
5546 
5547  # Fill our dictionary with all the required info we
5548  # need to understand harvesting jobs. This needs to be
5549  # done after the CMSSW version is known.
5550  self.setup_harvesting_info()
5551 
5552  # Obtain list of dataset names to consider
5553  self.build_dataset_use_list()
5554  # and the list of dataset names to ignore.
5555  self.build_dataset_ignore_list()
5556 
5557  # The same for the runs lists (if specified).
5558  self.build_runs_use_list()
5559  self.build_runs_ignore_list()
5560 
5561  # Process the list of datasets to ignore and fold that
5562  # into the list of datasets to consider.
5563  # NOTE: The run-based selection is done later since
5564  # right now we don't know yet which runs a dataset
5565  # contains.
5566  self.process_dataset_ignore_list()
5567 
5568  # Obtain all required information on the datasets,
5569  # like run numbers and GlobalTags.
5570  self.build_datasets_information()
5571 
5572  if self.use_ref_hists and \
5573  self.ref_hist_mappings_needed():
5574  # Load the dataset name to reference histogram
5575  # name mappings from file.
5576  self.load_ref_hist_mappings()
5577  # Now make sure that for all datasets we want to
5578  # process there is a reference defined. Otherwise
5579  # just bomb out before wasting any more time.
5580  self.check_ref_hist_mappings()
5581  else:
5582  self.logger.info("No need to load reference " \
5583  "histogram mappings file")
5584 
5585  # OBSOLETE OBSOLETE OBSOLETE
5586 ## # TODO TODO TODO
5587 ## # Need to think about where this should go, but
5588 ## # somewhere we have to move over the fact that we want
5589 ## # to process all runs for each dataset that we're
5590 ## # considering. This basically means copying over the
5591 ## # information from self.datasets_information[]["runs"]
5592 ## # to self.datasets_to_use[].
5593 ## for dataset_name in self.datasets_to_use.keys():
5594 ## self.datasets_to_use[dataset_name] = self.datasets_information[dataset_name]["runs"]
5595 ## # TODO TODO TODO end
5596  # OBSOLETE OBSOLETE OBSOLETE end
5597 
5598  self.process_runs_use_and_ignore_lists()
5599 
5600  # If we've been asked to sacrifice some parts of
5601  # spread-out samples in order to be able to partially
5602  # harvest them, we'll do that here.
5603  if self.harvesting_mode == "single-step-allow-partial":
5604  self.singlify_datasets()
5605 
5606  # Check dataset name(s)
5607  self.check_dataset_list()
5608  # and see if there is anything left to do.
5609  if len(self.datasets_to_use) < 1:
5610  self.logger.info("After all checks etc. " \
5611  "there are no datasets (left?) " \
5612  "to process")
5613  else:
5614 
5615  self.logger.info("After all checks etc. we are left " \
5616  "with %d dataset(s) to process " \
5617  "for a total of %d runs" % \
5618  (len(self.datasets_to_use),
5619  sum([len(i) for i in \
5620  self.datasets_to_use.values()])))
5621 
5622  # NOTE: The order in which things are done here is
5623  # important. At the end of the job, independent on
5624  # how it ends (exception, CTRL-C, normal end) the
5625  # book keeping is written to file. At that time it
5626  # should be clear which jobs are done and can be
5627  # submitted. This means we first create the
5628  # general files, and then the per-job config
5629  # files.
5630 
5631  # TODO TODO TODO
5632  # It would be good to modify the book keeping a
5633  # bit. Now we write the crab.cfg (which is the
5634  # same for all samples and runs) and the
5635  # multicrab.cfg (which contains blocks for all
5636  # runs of all samples) without updating our book
5637  # keeping. The only place we update the book
5638  # keeping is after writing the harvesting config
5639  # file for a given dataset. Since there is only
5640  # one single harvesting configuration for each
5641  # dataset, we have no book keeping information on
5642  # a per-run basis.
5643  # TODO TODO TODO end
5644 
5645  # Check if the CASTOR output area exists. If
5646  # necessary create it.
5647  self.create_and_check_castor_dirs()
5648 
5649  # Create one crab and one multicrab configuration
5650  # for all jobs together.
5651  self.write_crab_config()
5652  self.write_multicrab_config()
5653 
5654  # Loop over all datasets and create harvesting
5655  # config files for all of them. One harvesting
5656  # config per dataset is enough. The same file will
5657  # be re-used by CRAB for each run.
5658  # NOTE: We always need a harvesting
5659  # configuration. For the two-step harvesting we
5660  # also need a configuration file for the first
5661  # step: the monitoring element extraction.
5662  for dataset_name in self.datasets_to_use.keys():
5663  try:
5664  self.write_harvesting_config(dataset_name)
5665  if self.harvesting_mode == "two-step":
5666  self.write_me_extraction_config(dataset_name)
5667  except:
5668  # Doh! Just re-raise the damn thing.
5669  raise
5670  else:
5671 ## tmp = self.datasets_information[dataset_name] \
5672 ## ["num_events"]
5673  tmp = {}
5674  for run_number in self.datasets_to_use[dataset_name]:
5675  tmp[run_number] = self.datasets_information \
5676  [dataset_name]["num_events"][run_number]
5677  if self.book_keeping_information. \
5678  has_key(dataset_name):
5679  self.book_keeping_information[dataset_name].update(tmp)
5680  else:
5681  self.book_keeping_information[dataset_name] = tmp
5682 
5683  # Explain the user what to do now.
5684  self.show_exit_message()
5685 
5686  except Usage, err:
5687  # self.logger.fatal(err.msg)
5688  # self.option_parser.print_help()
5689  pass
5690 
5691  except Error, err:
5692  # self.logger.fatal(err.msg)
5693  exit_code = 1
5694 
5695  except Exception, err:
5696  # Hmmm, ignore keyboard interrupts from the
5697  # user. These are not a `serious problem'. We also
5698  # skip SystemExit, which is the exception thrown when
5699  # one calls sys.exit(). This, for example, is done by
5700  # the option parser after calling print_help(). We
5701  # also have to catch all `no such option'
5702  # complaints. Everything else we catch here is a
5703  # `serious problem'.
5704  if isinstance(err, SystemExit):
5705  self.logger.fatal(err.code)
5706  elif not isinstance(err, KeyboardInterrupt):
5707  self.logger.fatal("!" * 50)
5708  self.logger.fatal(" This looks like a serious problem.")
5709  self.logger.fatal(" If you are sure you followed all " \
5710  "instructions")
5711  self.logger.fatal(" please copy the below stack trace together")
5712  self.logger.fatal(" with a description of what you were doing to")
5713  self.logger.fatal(" jeroen.hegeman@cern.ch.")
5714  self.logger.fatal(" %s" % self.ident_string())
5715  self.logger.fatal("!" * 50)
5716  self.logger.fatal(str(err))
5717  import traceback
5718  traceback_string = traceback.format_exc()
5719  for line in traceback_string.split("\n"):
5720  self.logger.fatal(line)
5721  self.logger.fatal("!" * 50)
5722  exit_code = 2
5723 
5724  # This is the stuff that we should really do, no matter
5725  # what. Of course cleaning up after ourselves is also done
5726  # from this place. This alsokeeps track of the book keeping
5727  # so far. (This means that if half of the configuration files
5728  # were created before e.g. the disk was full, we should still
5729  # have a consistent book keeping file.
5730  finally:
5731 
5732  self.cleanup()
5733 
5734  ###
5735 
5736  if self.crab_submission == True:
5737  os.system("multicrab -create")
5738  os.system("multicrab -submit")
5739 
5740  # End of run.
5741  return exit_code
5742 
5743  # End of CMSHarvester.
5744 
5745 ###########################################################################
5746 ## Main entry point.
5747 ###########################################################################
5748 
5749 if __name__ == "__main__":
5750  "Main entry point for harvesting."
5751 
5752  CMSHarvester().run()
5753 
5754  # Done.
5755 
5756 ###########################################################################
def load_ref_hist_mappings
def build_runs_use_list
def check_input_status
def create_crab_config
def create_and_check_castor_dirs
def build_datasets_information
def check_globaltag_exists
def build_runs_ignore_list
assert(m_qm.get())
def create_me_extraction_config
def check_dataset_list
def dbs_resolve_globaltag
def create_multicrab_config
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:7
def escape_dataset_name
def write_me_extraction_config
def create_me_summary_output_file_name
def write_multicrab_config
tuple zip
Definition: archive.py:476
Helper class: Error exception.
def create_multicrab_block_name
Helper class: Usage exception.
def write_harvesting_config
def process_runs_use_and_ignore_lists
def option_handler_saveByLumiSection
def option_handler_list_types
def create_harvesting_config_file_name
def create_and_check_castor_dir
def write_crab_config
def create_harvesting_config(self, dataset_name): &quot;&quot;"Create the Python harvesting configuration for a...
def create_castor_path_name_common
Helper class: DBSXMLHandler.
def parse_cmd_line_options
def build_dataset_ignore_list
def option_handler_castor_dir
def option_handler_dataset_name(self, option, opt_str, value, parser): &quot;&quot;"Specify the name(s) of the ...
def create_harvesting_config
def create_output_file_name
def dbs_resolve_datatype
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def option_handler_sites
def ref_hist_mappings_needed
def check_globaltag_contains_ref_hist_key
def dbs_resolve_runs
def dbs_resolve_dataset_number_of_events(self, dataset_name): &quot;&quot;"Ask DBS across how many events this ...
CMSHarvester class.
list object
Definition: dbtoconf.py:77
def create_harvesting_output_file_name
def dbs_resolve_number_of_events
def create_castor_path_name_special
def build_dataset_list
def dbs_check_dataset_num_events(self, dataset_name): &quot;&quot;"Figure out the number of events in each run ...
#define update(a, b)
def option_handler_crab_submission
def dbs_resolve_dataset_name
def check_ref_hist_mappings
def option_handler_no_t1access
def create_me_summary_config_file_name
def create_config_file_name
def create_es_prefer_snippet
def dbs_check_dataset_spread
def dbs_resolve_dataset_number_of_sites(self, dataset_name): &quot;&quot;"Ask DBS across how many sites this da...
def option_handler_preferred_site
def build_dataset_use_list
def process_dataset_ignore_list
def dbs_resolve_cmssw_version
Helper class: CMSHarvesterHelpFormatter.
def setup_harvesting_info
double split
Definition: MVATrainer.cc:139
def check_ref_hist_tag
def option_handler_caf_access
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
def setup_dbs
Now we try to do a very simple DBS search.