CMS 3D CMS Logo

cmsHarvester.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 ###########################################################################
4 ## File : cmsHarvest.py
5 ## Authors : Jeroen Hegeman (jeroen.hegeman@cern.ch)
6 ## Niklas Pietsch (niklas.pietsch@desy.de)
7 ## Franseco Costanza (francesco.costanza@desy.de)
8 ## Last change: 20100308
9 
14 
15 """Main program to run all kinds of harvesting.
16 
17 These are the basic kinds of harvesting implemented (contact me if
18 your favourite is missing):
19 
20 - RelVal : Run for release validation samples. Makes heavy use of MC
21  truth information.
22 
23 - RelValFS: FastSim RelVal.
24 
25 - MC : Run for MC samples.
26 
27 - DQMOffline : Run for real data (could also be run for MC).
28 
29 For the mappings of these harvesting types to sequence names please
30 see the setup_harvesting_info() and option_handler_list_types()
31 methods.
32 
33 """
34 
35 ###########################################################################
36 
37 __version__ = "3.8.2p1" # (version jump to match release)
38 __author__ = "Jeroen Hegeman (jeroen.hegeman@cern.ch)," \
39  "Niklas Pietsch (niklas.pietsch@desy.de)"
40 
41 twiki_url = "https://twiki.cern.ch/twiki/bin/view/CMS/CmsHarvester"
42 
43 ###########################################################################
44 
45 ###########################################################################
46 ## TODO list
47 
88 
89 import os
90 import sys
91 import commands
92 import re
93 import logging
94 import optparse
95 import datetime
96 import copy
97 from inspect import getargspec
98 from random import choice
99 
100 import six
101 
102 # These we need to communicate with DBS global DBSAPI
103 from DBSAPI.dbsApi import DbsApi
104 import DBSAPI.dbsException
106 from functools import reduce
107 # and these we need to parse the DBS output.
108 global xml
109 global SAXParseException
110 import xml.sax
111 from xml.sax import SAXParseException
112 
113 import Configuration.PyReleaseValidation
114 from Configuration.PyReleaseValidation.ConfigBuilder import \
115  ConfigBuilder, defaultOptions
116 # from Configuration.PyReleaseValidation.cmsDriverOptions import options, python_config_filename
117 
118 #import FWCore.ParameterSet.Config as cms
119 
120 # Debugging stuff.
121 import pdb
122 try:
123  import debug_hook
124 except ImportError:
125  pass
126 
127 ###########################################################################
128 ## Helper class: Usage exception.
129 ###########################################################################
131  def __init__(self, msg):
132  self.msg = msg
133  def __str__(self):
134  return repr(self.msg)
135 
136  # End of Usage.
137 
138 ###########################################################################
139 ## Helper class: Error exception.
140 ###########################################################################
142  def __init__(self, msg):
143  self.msg = msg
144  def __str__(self):
145  return repr(self.msg)
146 
147 ###########################################################################
148 ## Helper class: CMSHarvesterHelpFormatter.
149 ###########################################################################
150 
151 class CMSHarvesterHelpFormatter(optparse.IndentedHelpFormatter):
152  """Helper class to add some customised help output to cmsHarvester.
153 
154  We want to add some instructions, as well as a pointer to the CMS
155  Twiki.
156 
157  """
158 
159  def format_usage(self, usage):
160 
161  usage_lines = []
162 
163  sep_line = "-" * 60
164  usage_lines.append(sep_line)
165  usage_lines.append("Welcome to the CMS harvester, a (hopefully useful)")
166  usage_lines.append("tool to create harvesting configurations.")
167  usage_lines.append("For more information please have a look at the CMS Twiki:")
168  usage_lines.append(" %s" % twiki_url)
169  usage_lines.append(sep_line)
170  usage_lines.append("")
171 
172  # Since we only add to the output, we now just append the
173  # original output from IndentedHelpFormatter.
174  usage_lines.append(optparse.IndentedHelpFormatter. \
175  format_usage(self, usage))
176 
177  formatted_usage = "\n".join(usage_lines)
178  return formatted_usage
179 
180  # End of CMSHarvesterHelpFormatter.
181 
182 ###########################################################################
183 ## Helper class: DBSXMLHandler.
184 ###########################################################################
185 
186 class DBSXMLHandler(xml.sax.handler.ContentHandler):
187  """XML handler class to parse DBS results.
188 
189  The tricky thing here is that older DBS versions (2.0.5 and
190  earlier) return results in a different XML format than newer
191  versions. Previously the result values were returned as attributes
192  to the `result' element. The new approach returns result values as
193  contents of named elements.
194 
195  The old approach is handled directly in startElement(), the new
196  approach in characters().
197 
198  NOTE: All results are returned in the form of string values of
199  course!
200 
201  """
202 
203  # This is the required mapping from the name of the variable we
204  # ask for to what we call it ourselves. (Effectively this is the
205  # mapping between the old attribute key name and the new element
206  # name.)
207  mapping = {
208  "dataset" : "PATH",
209  "dataset.tag" : "PROCESSEDDATASET_GLOBALTAG",
210  "datatype.type" : "PRIMARYDSTYPE_TYPE",
211  "run" : "RUNS_RUNNUMBER",
212  "run.number" : "RUNS_RUNNUMBER",
213  "file.name" : "FILES_LOGICALFILENAME",
214  "file.numevents" : "FILES_NUMBEROFEVENTS",
215  "algo.version" : "APPVERSION_VERSION",
216  "site" : "STORAGEELEMENT_SENAME",
217  }
218 
219  def __init__(self, tag_names):
220  # This is a list used as stack to keep track of where we are
221  # in the element tree.
223  self.tag_names = tag_names
224  self.results = {}
225 
226  def startElement(self, name, attrs):
227  self.element_position.append(name)
228 
229  self.current_value = []
230 
231  #----------
232 
233  # This is to catch results from DBS 2.0.5 and earlier.
234  if name == "result":
235  for name in self.tag_names:
236  key = DBSXMLHandler.mapping[name]
237  value = str(attrs[key])
238  try:
239  self.results[name].append(value)
240  except KeyError:
241  self.results[name] = [value]
242 
243  #----------
244 
245  def endElement(self, name):
246  assert self.current_element() == name, \
247  "closing unopenend element `%s'" % name
248 
249  if self.current_element() in self.tag_names:
250  contents = "".join(self.current_value)
251  if self.current_element() in self.results:
252  self.results[self.current_element()].append(contents)
253  else:
254  self.results[self.current_element()] = [contents]
255 
256  self.element_position.pop()
257 
258  def characters(self, content):
259  # NOTE: It is possible that the actual contents of the tag
260  # gets split into multiple pieces. This method will be called
261  # for each of the pieces. This means we have to concatenate
262  # everything ourselves.
263  if self.current_element() in self.tag_names:
264  self.current_value.append(content)
265 
266  def current_element(self):
267  return self.element_position[-1]
268 
270  """Make sure that all results arrays have equal length.
271 
272  We should have received complete rows from DBS. I.e. all
273  results arrays in the handler should be of equal length.
274 
275  """
276 
277  results_valid = True
278 
279  res_names = self.results.keys()
280  if len(res_names) > 1:
281  for res_name in res_names[1:]:
282  res_tmp = self.results[res_name]
283  if len(res_tmp) != len(self.results[res_names[0]]):
284  results_valid = False
285 
286  return results_valid
287 
288  # End of DBSXMLHandler.
289 
290 ###########################################################################
291 ## CMSHarvester class.
292 ###########################################################################
293 
295  """Class to perform CMS harvesting.
296 
297  More documentation `obviously' to follow.
298 
299  """
300 
301  ##########
302 
303  def __init__(self, cmd_line_opts=None):
304  "Initialize class and process command line options."
305 
306  self.version = __version__
307 
308  # These are the harvesting types allowed. See the head of this
309  # file for more information.
311  "RelVal",
312  "RelValFS",
313  "MC",
314  "DQMOffline",
315  ]
316 
317  # These are the possible harvesting modes:
318  # - Single-step: harvesting takes place on-site in a single
319  # step. For each samples only a single ROOT file containing
320  # the harvesting results is returned.
321  # - Single-step-allow-partial: special hack to allow
322  # harvesting of partial statistics using single-step
323  # harvesting on spread-out samples.
324  # - Two-step: harvesting takes place in two steps. The first
325  # step returns a series of monitoring elenent summaries for
326  # each sample. The second step then merges these summaries
327  # locally and does the real harvesting. This second step
328  # produces the ROOT file containing the harvesting results.
330  "single-step",
331  "single-step-allow-partial",
332  "two-step"
333  ]
334 
335  # It is possible to specify a GlobalTag that will override any
336  # choices (regarding GlobalTags) made by the cmsHarvester.
337  # BUG BUG BUG
338  # For the moment, until I figure out a way to obtain the
339  # GlobalTag with which a given data (!) dataset was created,
340  # it is necessary to specify a GlobalTag for harvesting of
341  # data.
342  # BUG BUG BUG end
343  self.globaltag = None
344 
345  # It's also possible to switch off the use of reference
346  # histograms altogether.
347  self.use_ref_hists = True
348 
349  # The database name and account are hard-coded. They are not
350  # likely to change before the end-of-life of this tool. But of
351  # course there is a way to override this from the command
352  # line. One can even override the Frontier connection used for
353  # the GlobalTag and for the reference histograms
354  # independently. Please only use this for testing purposes.
356  self.frontier_connection_name["globaltag"] = "frontier://" \
357  "FrontierProd/"
358  self.frontier_connection_name["refhists"] = "frontier://" \
359  "FrontierProd/"
361  for key in self.frontier_connection_name.keys():
362  self.frontier_connection_overridden[key] = False
363 
364  # This contains information specific to each of the harvesting
365  # types. Used to create the harvesting configuration. It is
366  # filled by setup_harvesting_info().
367  self.harvesting_info = None
368 
369  ###
370 
371  # These are default `unused' values that will be filled in
372  # depending on the command line options.
373 
374  # The type of harvesting we're doing. See
375  # self.harvesting_types for allowed types.
376  self.harvesting_type = None
377 
378  # The harvesting mode, popularly known as single-step
379  # vs. two-step. The thing to remember at this point is that
380  # single-step is only possible for samples located completely
381  # at a single site (i.e. SE).
382  self.harvesting_mode = None
383  # BUG BUG BUG
384  # Default temporarily set to two-step until we can get staged
385  # jobs working with CRAB.
386  self.harvesting_mode_default = "single-step"
387  # BUG BUG BUG end
388 
389  # The input method: are we reading a dataset name (or regexp)
390  # directly from the command line or are we reading a file
391  # containing a list of dataset specifications. Actually we
392  # keep one of each for both datasets and runs.
393  self.input_method = {}
394  self.input_method["datasets"] = {}
395  self.input_method["datasets"]["use"] = None
396  self.input_method["datasets"]["ignore"] = None
397  self.input_method["runs"] = {}
398  self.input_method["runs"]["use"] = None
399  self.input_method["runs"]["ignore"] = None
400  self.input_method["runs"]["ignore"] = None
401  # The name of whatever input we're using.
402  self.input_name = {}
403  self.input_name["datasets"] = {}
404  self.input_name["datasets"]["use"] = None
405  self.input_name["datasets"]["ignore"] = None
406  self.input_name["runs"] = {}
407  self.input_name["runs"]["use"] = None
408  self.input_name["runs"]["ignore"] = None
409 
410  self.Jsonlumi = False
411  self.Jsonfilename = "YourJSON.txt"
412  self.Jsonrunfilename = "YourJSON.txt"
413  self.todofile = "YourToDofile.txt"
414 
415  # If this is true, we're running in `force mode'. In this case
416  # the sanity checks are performed but failure will not halt
417  # everything.
418  self.force_running = None
419 
420  # The base path of the output dir in CASTOR.
421  self.castor_base_dir = None
422  self.castor_base_dir_default = "/castor/cern.ch/" \
423  "cms/store/temp/" \
424  "dqm/offline/harvesting_output/"
425 
426  # The name of the file to be used for book keeping: which
427  # datasets, runs, etc. we have already processed.
428  self.book_keeping_file_name = None
429  self.book_keeping_file_name_default = "harvesting_accounting.txt"
430 
431  # The dataset name to reference histogram name mapping is read
432  # from a text file. The name of this file is kept in the
433  # following variable.
434  self.ref_hist_mappings_file_name = None
435  # And this is the default value.
436  self.ref_hist_mappings_file_name_default = "harvesting_ref_hist_mappings.txt"
437 
438  # Hmmm, hard-coded prefix of the CERN CASTOR area. This is the
439  # only supported CASTOR area.
440  # NOTE: Make sure this one starts with a `/'.
441  self.castor_prefix = "/castor/cern.ch"
442 
443  # Normally the central harvesting should be done using the
444  # `t1access' grid role. To be able to run without T1 access
445  # the --no-t1access flag can be used. This variable keeps
446  # track of that special mode.
447  self.non_t1access = False
448  self.caf_access = False
449  self.saveByLumiSection = False
450  self.crab_submission = False
451  self.nr_max_sites = 1
452 
453  self.preferred_site = "no preference"
454 
455  # This will become the list of datasets and runs to consider
456  self.datasets_to_use = {}
457  # and this will become the list of datasets and runs to skip.
458  self.datasets_to_ignore = {}
459  # This, in turn, will hold all book keeping information.
460  self.book_keeping_information = {}
461  # And this is where the dataset name to reference histogram
462  # name mapping is stored.
463  self.ref_hist_mappings = {}
464 
465  # We're now also allowing run selection. This means we also
466  # have to keep list of runs requested and vetoed by the user.
467  self.runs_to_use = {}
468  self.runs_to_ignore = {}
469 
470  # Cache for CMSSW version availability at different sites.
471  self.sites_and_versions_cache = {}
472 
473  # Cache for checked GlobalTags.
474  self.globaltag_check_cache = []
475 
476  # Global flag to see if there were any jobs for which we could
477  # not find a matching site.
478  self.all_sites_found = True
479 
480  # Helper string centrally defined.
481  self.no_matching_site_found_str = "no_matching_site_found"
482 
483  # Store command line options for later use.
484  if cmd_line_opts is None:
485  cmd_line_opts = sys.argv[1:]
486  self.cmd_line_opts = cmd_line_opts
487 
488  # Set up the logger.
489  log_handler = logging.StreamHandler()
490  # This is the default log formatter, the debug option switches
491  # on some more information.
492  log_formatter = logging.Formatter("%(message)s")
493  log_handler.setFormatter(log_formatter)
494  logger = logging.getLogger()
495  logger.name = "main"
496  logger.addHandler(log_handler)
497  self.logger = logger
498  # The default output mode is quite verbose.
499  self.set_output_level("NORMAL")
500 
501  #logger.debug("Initialized successfully")
502 
503  # End of __init__.
504 
505  ##########
506 
507  def cleanup(self):
508  "Clean up after ourselves."
509 
510  # NOTE: This is the safe replacement of __del__.
511 
512  #self.logger.debug("All done -> shutting down")
513  logging.shutdown()
514 
515  # End of cleanup.
516 
517  ##########
518 
519  def time_stamp(self):
520  "Create a timestamp to use in the created config files."
521 
522  time_now = datetime.datetime.utcnow()
523  # We don't care about the microseconds.
524  time_now = time_now.replace(microsecond = 0)
525  time_stamp = "%sUTC" % datetime.datetime.isoformat(time_now)
526 
527  # End of time_stamp.
528  return time_stamp
529 
530  ##########
531 
532  def ident_string(self):
533  "Spit out an identification string for cmsHarvester.py."
534 
535  ident_str = "`cmsHarvester.py " \
536  "version %s': cmsHarvester.py %s" % \
537  (__version__,
538  reduce(lambda x, y: x+' '+y, sys.argv[1:]))
539 
540  return ident_str
541 
542  ##########
543 
544  def format_conditions_string(self, globaltag):
545  """Create the conditions string needed for `cmsDriver'.
546 
547  Just glueing the FrontierConditions bit in front of it really.
548 
549  """
550 
551  # Not very robust but okay. The idea is that if the user
552  # specified (since this cannot happen with GlobalTags coming
553  # from DBS) something containing `conditions', they probably
554  # know what they're doing and we should not muck things up. In
555  # all other cases we just assume we only received the
556  # GlobalTag part and we built the usual conditions string from
557  # that .
558  if globaltag.lower().find("conditions") > -1:
559  conditions_string = globaltag
560  else:
561  conditions_string = "FrontierConditions_GlobalTag,%s" % \
562  globaltag
563 
564  # End of format_conditions_string.
565  return conditions_string
566 
567  ##########
568 
570  """Return the database account name used to store the GlobalTag.
571 
572  The name of the database account depends (albeit weakly) on
573  the CMSSW release version.
574 
575  """
576 
577  # This never changed, unlike the cms_cond_31X_DQM_SUMMARY ->
578  # cms_cond_34X_DQM transition.
579  account_name = "CMS_COND_31X_GLOBALTAG"
580 
581  # End of db_account_name_cms_cond_globaltag.
582  return account_name
583 
584  ##########
585 
587  """See db_account_name_cms_cond_globaltag."""
588 
589  account_name = None
590  version = self.cmssw_version[6:11]
591  if version < "3_4_0":
592  account_name = "CMS_COND_31X_DQM_SUMMARY"
593  else:
594  account_name = "CMS_COND_34X"
595 
596  # End of db_account_name_cms_cond_dqm_summary.
597  return account_name
598 
599  ##########
600 
602  "Create a nice header to be used to mark the generated files."
603 
604  tmp = []
605 
606  time_stamp = self.time_stamp()
607  ident_str = self.ident_string()
608  tmp.append("# %s" % time_stamp)
609  tmp.append("# WARNING: This file was created automatically!")
610  tmp.append("")
611  tmp.append("# Created by %s" % ident_str)
612 
613  header = "\n".join(tmp)
614 
615  # End of config_file_header.
616  return header
617 
618  ##########
619 
620  def set_output_level(self, output_level):
621  """Adjust the level of output generated.
622 
623  Choices are:
624  - normal : default level of output
625  - quiet : less output than the default
626  - verbose : some additional information
627  - debug : lots more information, may be overwhelming
628 
629  NOTE: The debug option is a bit special in the sense that it
630  also modifies the output format.
631 
632  """
633 
634  # NOTE: These levels are hooked up to the ones used in the
635  # logging module.
636  output_levels = {
637  "NORMAL" : logging.INFO,
638  "QUIET" : logging.WARNING,
639  "VERBOSE" : logging.INFO,
640  "DEBUG" : logging.DEBUG
641  }
642 
643  output_level = output_level.upper()
644 
645  try:
646  # Update the logger.
647  self.log_level = output_levels[output_level]
648  self.logger.setLevel(self.log_level)
649  except KeyError:
650  # Show a complaint
651  self.logger.fatal("Unknown output level `%s'" % ouput_level)
652  # and re-raise an exception.
653  raise Exception
654 
655  # End of set_output_level.
656 
657  ##########
658 
659  def option_handler_debug(self, option, opt_str, value, parser):
660  """Switch to debug mode.
661 
662  This both increases the amount of output generated, as well as
663  changes the format used (more detailed information is given).
664 
665  """
666 
667  # Switch to a more informative log formatter for debugging.
668  log_formatter_debug = logging.Formatter("[%(levelname)s] " \
669  # NOTE: funcName was
670  # only implemented
671  # starting with python
672  # 2.5.
673  #"%(funcName)s() " \
674  #"@%(filename)s:%(lineno)d " \
675  "%(message)s")
676  # Hmmm, not very nice. This assumes there's only a single
677  # handler associated with the current logger.
678  log_handler = self.logger.handlers[0]
679  log_handler.setFormatter(log_formatter_debug)
680  self.set_output_level("DEBUG")
681 
682  # End of option_handler_debug.
683 
684  ##########
685 
686  def option_handler_quiet(self, option, opt_str, value, parser):
687  "Switch to quiet mode: less verbose."
688 
689  self.set_output_level("QUIET")
690 
691  # End of option_handler_quiet.
692 
693  ##########
694 
695  def option_handler_force(self, option, opt_str, value, parser):
696  """Switch on `force mode' in which case we don't brake for nobody.
697 
698  In so-called `force mode' all sanity checks are performed but
699  we don't halt on failure. Of course this requires some care
700  from the user.
701 
702  """
703 
704  self.logger.debug("Switching on `force mode'.")
705  self.force_running = True
706 
707  # End of option_handler_force.
708 
709  ##########
710 
711  def option_handler_harvesting_type(self, option, opt_str, value, parser):
712  """Set the harvesting type to be used.
713 
714  This checks that no harvesting type is already set, and sets
715  the harvesting type to be used to the one specified. If a
716  harvesting type is already set an exception is thrown. The
717  same happens when an unknown type is specified.
718 
719  """
720 
721  # Check for (in)valid harvesting types.
722  # NOTE: The matching is done in a bit of a complicated
723  # way. This allows the specification of the type to be
724  # case-insensitive while still ending up with the properly
725  # `cased' version afterwards.
726  value = value.lower()
727  harvesting_types_lowered = [i.lower() for i in self.harvesting_types]
728  try:
729  type_index = harvesting_types_lowered.index(value)
730  # If this works, we now have the index to the `properly
731  # cased' version of the harvesting type.
732  except ValueError:
733  self.logger.fatal("Unknown harvesting type `%s'" % \
734  value)
735  self.logger.fatal(" possible types are: %s" %
736  ", ".join(self.harvesting_types))
737  raise Usage("Unknown harvesting type `%s'" % \
738  value)
739 
740  # Check if multiple (by definition conflicting) harvesting
741  # types are being specified.
742  if not self.harvesting_type is None:
743  msg = "Only one harvesting type should be specified"
744  self.logger.fatal(msg)
745  raise Usage(msg)
746  self.harvesting_type = self.harvesting_types[type_index]
747 
748  self.logger.info("Harvesting type to be used: `%s'" % \
749  self.harvesting_type)
750 
751  # End of option_handler_harvesting_type.
752 
753  ##########
754 
755  def option_handler_harvesting_mode(self, option, opt_str, value, parser):
756  """Set the harvesting mode to be used.
757 
758  Single-step harvesting can be used for samples that are
759  located completely at a single site (= SE). Otherwise use
760  two-step mode.
761 
762  """
763 
764  # Check for valid mode.
765  harvesting_mode = value.lower()
766  if not harvesting_mode in self.harvesting_modes:
767  msg = "Unknown harvesting mode `%s'" % harvesting_mode
768  self.logger.fatal(msg)
769  self.logger.fatal(" possible modes are: %s" % \
770  ", ".join(self.harvesting_modes))
771  raise Usage(msg)
772 
773  # Check if we've been given only a single mode, otherwise
774  # complain.
775  if not self.harvesting_mode is None:
776  msg = "Only one harvesting mode should be specified"
777  self.logger.fatal(msg)
778  raise Usage(msg)
779  self.harvesting_mode = harvesting_mode
780 
781  self.logger.info("Harvesting mode to be used: `%s'" % \
782  self.harvesting_mode)
783 
784  # End of option_handler_harvesting_mode.
785 
786  ##########
787 
788  def option_handler_globaltag(self, option, opt_str, value, parser):
789  """Set the GlobalTag to be used, overriding our own choices.
790 
791  By default the cmsHarvester will use the GlobalTag with which
792  a given dataset was created also for the harvesting. The
793  --globaltag option is the way to override this behaviour.
794 
795  """
796 
797  # Make sure that this flag only occurred once.
798  if not self.globaltag is None:
799  msg = "Only one GlobalTag should be specified"
800  self.logger.fatal(msg)
801  raise Usage(msg)
802  self.globaltag = value
803 
804  self.logger.info("GlobalTag to be used: `%s'" % \
805  self.globaltag)
806 
807  # End of option_handler_globaltag.
808 
809  ##########
810 
811  def option_handler_no_ref_hists(self, option, opt_str, value, parser):
812  "Switch use of all reference histograms off."
813 
814  self.use_ref_hists = False
815 
816  self.logger.warning("Switching off all use of reference histograms")
817 
818  # End of option_handler_no_ref_hists.
819 
820  ##########
821 
822  def option_handler_frontier_connection(self, option, opt_str,
823  value, parser):
824  """Override the default Frontier connection string.
825 
826  Please only use this for testing (e.g. when a test payload has
827  been inserted into cms_orc_off instead of cms_orc_on).
828 
829  This method gets called for three different command line
830  options:
831  - --frontier-connection,
832  - --frontier-connection-for-globaltag,
833  - --frontier-connection-for-refhists.
834  Appropriate care has to be taken to make sure things are only
835  specified once.
836 
837  """
838 
839  # Figure out with which command line option we've been called.
840  frontier_type = opt_str.split("-")[-1]
841  if frontier_type == "connection":
842  # Main option: change all connection strings.
843  frontier_types = self.frontier_connection_name.keys()
844  else:
845  frontier_types = [frontier_type]
846 
847  # Make sure that each Frontier connection is specified only
848  # once. (Okay, in a bit of a dodgy way...)
849  for connection_name in frontier_types:
850  if self.frontier_connection_overridden[connection_name] == True:
851  msg = "Please specify either:\n" \
852  " `--frontier-connection' to change the " \
853  "Frontier connection used for everything, or\n" \
854  "either one or both of\n" \
855  " `--frontier-connection-for-globaltag' to " \
856  "change the Frontier connection used for the " \
857  "GlobalTag and\n" \
858  " `--frontier-connection-for-refhists' to change " \
859  "the Frontier connection used for the " \
860  "reference histograms."
861  self.logger.fatal(msg)
862  raise Usage(msg)
863 
864  frontier_prefix = "frontier://"
865  if not value.startswith(frontier_prefix):
866  msg = "Expecting Frontier connections to start with " \
867  "`%s'. You specified `%s'." % \
868  (frontier_prefix, value)
869  self.logger.fatal(msg)
870  raise Usage(msg)
871  # We also kind of expect this to be either FrontierPrep or
872  # FrontierProd (but this is just a warning).
873  if value.find("FrontierProd") < 0 and \
874  value.find("FrontierProd") < 0:
875  msg = "Expecting Frontier connections to contain either " \
876  "`FrontierProd' or `FrontierPrep'. You specified " \
877  "`%s'. Are you sure?" % \
878  value
879  self.logger.warning(msg)
880 
881  if not value.endswith("/"):
882  value += "/"
883 
884  for connection_name in frontier_types:
885  self.frontier_connection_name[connection_name] = value
886  self.frontier_connection_overridden[connection_name] = True
887 
888  frontier_type_str = "unknown"
889  if connection_name == "globaltag":
890  frontier_type_str = "the GlobalTag"
891  elif connection_name == "refhists":
892  frontier_type_str = "the reference histograms"
893 
894  self.logger.warning("Overriding default Frontier " \
895  "connection for %s " \
896  "with `%s'" % \
897  (frontier_type_str,
898  self.frontier_connection_name[connection_name]))
899 
900  # End of option_handler_frontier_connection
901 
902  ##########
903 
904  def option_handler_input_todofile(self, option, opt_str, value, parser):
905 
906  self.todofile = value
907  # End of option_handler_input_todofile.
908 
909  ##########
910 
911  def option_handler_input_Jsonfile(self, option, opt_str, value, parser):
912 
913  self.Jsonfilename = value
914  # End of option_handler_input_Jsonfile.
915 
916  ##########
917 
918  def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser):
919 
920  self.Jsonrunfilename = value
921  # End of option_handler_input_Jsonrunfile.
922 
923  ##########
924 
925  def option_handler_input_spec(self, option, opt_str, value, parser):
926  """TODO TODO TODO
927  Document this...
928 
929  """
930 
931  # Figure out if we were called for the `use these' or the
932  # `ignore these' case.
933  if opt_str.lower().find("ignore") > -1:
934  spec_type = "ignore"
935  else:
936  spec_type = "use"
937 
938  # Similar: are we being called for datasets or for runs?
939  if opt_str.lower().find("dataset") > -1:
940  select_type = "datasets"
941  else:
942  select_type = "runs"
943 
944  if not self.input_method[select_type][spec_type] is None:
945  msg = "Please only specify one input method " \
946  "(for the `%s' case)" % opt_str
947  self.logger.fatal(msg)
948  raise Usage(msg)
949 
950  input_method = opt_str.replace("-", "").replace("ignore", "")
951  self.input_method[select_type][spec_type] = input_method
952  self.input_name[select_type][spec_type] = value
953 
954  self.logger.debug("Input method for the `%s' case: %s" % \
955  (spec_type, input_method))
956 
957  # End of option_handler_input_spec
958 
959  ##########
960 
961  def option_handler_book_keeping_file(self, option, opt_str, value, parser):
962  """Store the name of the file to be used for book keeping.
963 
964  The only check done here is that only a single book keeping
965  file is specified.
966 
967  """
968 
969  file_name = value
970 
971  if not self.book_keeping_file_name is None:
972  msg = "Only one book keeping file should be specified"
973  self.logger.fatal(msg)
974  raise Usage(msg)
975  self.book_keeping_file_name = file_name
976 
977  self.logger.info("Book keeping file to be used: `%s'" % \
979 
980  # End of option_handler_book_keeping_file.
981 
982  ##########
983 
984  def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser):
985  """Store the name of the file for the ref. histogram mapping.
986 
987  """
988 
989  file_name = value
990 
991  if not self.ref_hist_mappings_file_name is None:
992  msg = "Only one reference histogram mapping file " \
993  "should be specified"
994  self.logger.fatal(msg)
995  raise Usage(msg)
997 
998  self.logger.info("Reference histogram mapping file " \
999  "to be used: `%s'" % \
1001 
1002  # End of option_handler_ref_hist_mapping_file.
1003 
1004  ##########
1005 
1006  # OBSOLETE OBSOLETE OBSOLETE
1007 
1008 ## def option_handler_dataset_name(self, option, opt_str, value, parser):
1009 ## """Specify the name(s) of the dataset(s) to be processed.
1010 
1011 ## It is checked to make sure that no dataset name or listfile
1012 ## names are given yet. If all is well (i.e. we still have a
1013 ## clean slate) the dataset name is stored for later use,
1014 ## otherwise a Usage exception is raised.
1015 
1016 ## """
1017 
1018 ## if not self.input_method is None:
1019 ## if self.input_method == "dataset":
1020 ## raise Usage("Please only feed me one dataset specification")
1021 ## elif self.input_method == "listfile":
1022 ## raise Usage("Cannot specify both dataset and input list file")
1023 ## else:
1024 ## assert False, "Unknown input method `%s'" % self.input_method
1025 ## self.input_method = "dataset"
1026 ## self.input_name = value
1027 ## self.logger.info("Input method used: %s" % self.input_method)
1028 
1029 ## # End of option_handler_dataset_name.
1030 
1031 ## ##########
1032 
1033 ## def option_handler_listfile_name(self, option, opt_str, value, parser):
1034 ## """Specify the input list file containing datasets to be processed.
1035 
1036 ## It is checked to make sure that no dataset name or listfile
1037 ## names are given yet. If all is well (i.e. we still have a
1038 ## clean slate) the listfile name is stored for later use,
1039 ## otherwise a Usage exception is raised.
1040 
1041 ## """
1042 
1043 ## if not self.input_method is None:
1044 ## if self.input_method == "listfile":
1045 ## raise Usage("Please only feed me one list file")
1046 ## elif self.input_method == "dataset":
1047 ## raise Usage("Cannot specify both dataset and input list file")
1048 ## else:
1049 ## assert False, "Unknown input method `%s'" % self.input_method
1050 ## self.input_method = "listfile"
1051 ## self.input_name = value
1052 ## self.logger.info("Input method used: %s" % self.input_method)
1053 
1054 ## # End of option_handler_listfile_name.
1055 
1056  # OBSOLETE OBSOLETE OBSOLETE end
1057 
1058  ##########
1059 
1060  def option_handler_castor_dir(self, option, opt_str, value, parser):
1061  """Specify where on CASTOR the output should go.
1062 
1063  At the moment only output to CERN CASTOR is
1064  supported. Eventually the harvested results should go into the
1065  central place for DQM on CASTOR anyway.
1066 
1067  """
1068 
1069  # Check format of specified CASTOR area.
1070  castor_dir = value
1071  #castor_dir = castor_dir.lstrip(os.path.sep)
1072  castor_prefix = self.castor_prefix
1073 
1074  # Add a leading slash if necessary and clean up the path.
1075  castor_dir = os.path.join(os.path.sep, castor_dir)
1076  self.castor_base_dir = os.path.normpath(castor_dir)
1077 
1078  self.logger.info("CASTOR (base) area to be used: `%s'" % \
1079  self.castor_base_dir)
1080 
1081  # End of option_handler_castor_dir.
1082 
1083  ##########
1084 
1085  def option_handler_no_t1access(self, option, opt_str, value, parser):
1086  """Set the self.no_t1access flag to try and create jobs that
1087  run without special `t1access' role.
1088 
1089  """
1090 
1091  self.non_t1access = True
1092 
1093  self.logger.warning("Running in `non-t1access' mode. " \
1094  "Will try to create jobs that run " \
1095  "without special rights but no " \
1096  "further promises...")
1097 
1098  # End of option_handler_no_t1access.
1099 
1100  ##########
1101 
1102  def option_handler_caf_access(self, option, opt_str, value, parser):
1103  """Set the self.caf_access flag to try and create jobs that
1104  run on the CAF.
1105 
1106  """
1107  self.caf_access = True
1108 
1109  self.logger.warning("Running in `caf_access' mode. " \
1110  "Will try to create jobs that run " \
1111  "on CAF but no" \
1112  "further promises...")
1113 
1114  # End of option_handler_caf_access.
1115 
1116  ##########
1117 
1118  def option_handler_saveByLumiSection(self, option, opt_str, value, parser):
1119  """Set process.dqmSaver.saveByLumiSectiont=1 in cfg harvesting file
1120  """
1121  self.saveByLumiSection = True
1122 
1123  self.logger.warning("waning concerning saveByLumiSection option")
1124 
1125  # End of option_handler_saveByLumiSection.
1126 
1127 
1128  ##########
1129 
1130  def option_handler_crab_submission(self, option, opt_str, value, parser):
1131  """Crab jobs are not created and
1132  "submitted automatically",
1133  """
1134  self.crab_submission = True
1135 
1136  # End of option_handler_crab_submission.
1137 
1138  ##########
1139 
1140  def option_handler_sites(self, option, opt_str, value, parser):
1141 
1142  self.nr_max_sites = value
1143 
1144  ##########
1145 
1146  def option_handler_preferred_site(self, option, opt_str, value, parser):
1147 
1148  self.preferred_site = value
1149 
1150  ##########
1151 
1152  def option_handler_list_types(self, option, opt_str, value, parser):
1153  """List all harvesting types and their mappings.
1154 
1155  This lists all implemented harvesting types with their
1156  corresponding mappings to sequence names. This had to be
1157  separated out from the help since it depends on the CMSSW
1158  version and was making things a bit of a mess.
1159 
1160  NOTE: There is no way (at least not that I could come up with)
1161  to code this in a neat generic way that can be read both by
1162  this method and by setup_harvesting_info(). Please try hard to
1163  keep these two methods in sync!
1164 
1165  """
1166 
1167  sep_line = "-" * 50
1168  sep_line_short = "-" * 20
1169 
1170  print sep_line
1171  print "The following harvesting types are available:"
1172  print sep_line
1173 
1174  print "`RelVal' maps to:"
1175  print " pre-3_3_0 : HARVESTING:validationHarvesting"
1176  print " 3_4_0_pre2 and later: HARVESTING:validationHarvesting+dqmHarvesting"
1177  print " Exceptions:"
1178  print " 3_3_0_pre1-4 : HARVESTING:validationHarvesting"
1179  print " 3_3_0_pre6 : HARVESTING:validationHarvesting"
1180  print " 3_4_0_pre1 : HARVESTING:validationHarvesting"
1181 
1182  print sep_line_short
1183 
1184  print "`RelValFS' maps to:"
1185  print " always : HARVESTING:validationHarvestingFS"
1186 
1187  print sep_line_short
1188 
1189  print "`MC' maps to:"
1190  print " always : HARVESTING:validationprodHarvesting"
1191 
1192  print sep_line_short
1193 
1194  print "`DQMOffline' maps to:"
1195  print " always : HARVESTING:dqmHarvesting"
1196 
1197  print sep_line
1198 
1199  # We're done, let's quit. (This is the same thing optparse
1200  # does after printing the help.)
1201  raise SystemExit
1202 
1203  # End of option_handler_list_types.
1204 
1205  ##########
1206 
1208  """Fill our dictionary with all info needed to understand
1209  harvesting.
1210 
1211  This depends on the CMSSW version since at some point the
1212  names and sequences were modified.
1213 
1214  NOTE: There is no way (at least not that I could come up with)
1215  to code this in a neat generic way that can be read both by
1216  this method and by option_handler_list_types(). Please try
1217  hard to keep these two methods in sync!
1218 
1219  """
1220 
1221  assert not self.cmssw_version is None, \
1222  "ERROR setup_harvesting() requires " \
1223  "self.cmssw_version to be set!!!"
1224 
1225  harvesting_info = {}
1226 
1227  # This is the version-independent part.
1228  harvesting_info["DQMOffline"] = {}
1229  harvesting_info["DQMOffline"]["beamspot"] = None
1230  harvesting_info["DQMOffline"]["eventcontent"] = None
1231  harvesting_info["DQMOffline"]["harvesting"] = "AtRunEnd"
1232 
1233  harvesting_info["RelVal"] = {}
1234  harvesting_info["RelVal"]["beamspot"] = None
1235  harvesting_info["RelVal"]["eventcontent"] = None
1236  harvesting_info["RelVal"]["harvesting"] = "AtRunEnd"
1237 
1238  harvesting_info["RelValFS"] = {}
1239  harvesting_info["RelValFS"]["beamspot"] = None
1240  harvesting_info["RelValFS"]["eventcontent"] = None
1241  harvesting_info["RelValFS"]["harvesting"] = "AtRunEnd"
1242 
1243  harvesting_info["MC"] = {}
1244  harvesting_info["MC"]["beamspot"] = None
1245  harvesting_info["MC"]["eventcontent"] = None
1246  harvesting_info["MC"]["harvesting"] = "AtRunEnd"
1247 
1248  # This is the version-dependent part. And I know, strictly
1249  # speaking it's not necessary to fill in all three types since
1250  # in a single run we'll only use one type anyway. This does
1251  # look more readable, however, and required less thought from
1252  # my side when I put this together.
1253 
1254  # DEBUG DEBUG DEBUG
1255  # Check that we understand our own version naming.
1256  assert self.cmssw_version.startswith("CMSSW_")
1257  # DEBUG DEBUG DEBUG end
1258 
1259  version = self.cmssw_version[6:]
1260 
1261  #----------
1262 
1263  # RelVal
1264  step_string = None
1265  if version < "3_3_0":
1266  step_string = "validationHarvesting"
1267  elif version in ["3_3_0_pre1", "3_3_0_pre2",
1268  "3_3_0_pre3", "3_3_0_pre4",
1269  "3_3_0_pre6", "3_4_0_pre1"]:
1270  step_string = "validationHarvesting"
1271  else:
1272  step_string = "validationHarvesting+dqmHarvesting"
1273 
1274  harvesting_info["RelVal"]["step_string"] = step_string
1275 
1276  # DEBUG DEBUG DEBUG
1277  # Let's make sure we found something.
1278  assert not step_string is None, \
1279  "ERROR Could not decide a RelVal harvesting sequence " \
1280  "for CMSSW version %s" % self.cmssw_version
1281  # DEBUG DEBUG DEBUG end
1282 
1283  #----------
1284 
1285  # RelVal
1286  step_string = "validationHarvestingFS"
1287 
1288  harvesting_info["RelValFS"]["step_string"] = step_string
1289 
1290  #----------
1291 
1292  # MC
1293  step_string = "validationprodHarvesting"
1294 
1295  harvesting_info["MC"]["step_string"] = step_string
1296 
1297  # DEBUG DEBUG DEBUG
1298  # Let's make sure we found something.
1299  assert not step_string is None, \
1300  "ERROR Could not decide a MC harvesting " \
1301  "sequence for CMSSW version %s" % self.cmssw_version
1302  # DEBUG DEBUG DEBUG end
1303 
1304  #----------
1305 
1306  # DQMOffline
1307  step_string = "dqmHarvesting"
1308 
1309  harvesting_info["DQMOffline"]["step_string"] = step_string
1310 
1311  #----------
1312 
1313  self.harvesting_info = harvesting_info
1314 
1315  self.logger.info("Based on the CMSSW version (%s) " \
1316  "I decided to use the `HARVESTING:%s' " \
1317  "sequence for %s harvesting" % \
1318  (self.cmssw_version,
1319  self.harvesting_info[self.harvesting_type]["step_string"],
1320  self.harvesting_type))
1321 
1322  # End of setup_harvesting_info.
1323 
1324  ##########
1325 
1326  def create_castor_path_name_common(self, dataset_name):
1327  """Build the common part of the output path to be used on
1328  CASTOR.
1329 
1330  This consists of the CASTOR area base path specified by the
1331  user and a piece depending on the data type (data vs. MC), the
1332  harvesting type and the dataset name followed by a piece
1333  containing the run number and event count. (See comments in
1334  create_castor_path_name_special for details.) This method
1335  creates the common part, without run number and event count.
1336 
1337  """
1338 
1339  castor_path = self.castor_base_dir
1340 
1341  ###
1342 
1343  # The data type: data vs. mc.
1344  datatype = self.datasets_information[dataset_name]["datatype"]
1345  datatype = datatype.lower()
1346  castor_path = os.path.join(castor_path, datatype)
1347 
1348  # The harvesting type.
1349  harvesting_type = self.harvesting_type
1350  harvesting_type = harvesting_type.lower()
1351  castor_path = os.path.join(castor_path, harvesting_type)
1352 
1353  # The CMSSW release version (only the `digits'). Note that the
1354  # CMSSW version used here is the version used for harvesting,
1355  # not the one from the dataset. This does make the results
1356  # slightly harder to find. On the other hand it solves
1357  # problems in case one re-harvests a given dataset with a
1358  # different CMSSW version, which would lead to ambiguous path
1359  # names. (Of course for many cases the harvesting is done with
1360  # the same CMSSW version the dataset was created with.)
1361  release_version = self.cmssw_version
1362  release_version = release_version.lower(). \
1363  replace("cmssw", ""). \
1364  strip("_")
1365  castor_path = os.path.join(castor_path, release_version)
1366 
1367  # The dataset name.
1368  dataset_name_escaped = self.escape_dataset_name(dataset_name)
1369  castor_path = os.path.join(castor_path, dataset_name_escaped)
1370 
1371  ###
1372 
1373  castor_path = os.path.normpath(castor_path)
1374 
1375  # End of create_castor_path_name_common.
1376  return castor_path
1377 
1378  ##########
1379 
1381  dataset_name, run_number,
1382  castor_path_common):
1383  """Create the specialised part of the CASTOR output dir name.
1384 
1385  NOTE: To avoid clashes with `incremental harvesting'
1386  (re-harvesting when a dataset grows) we have to include the
1387  event count in the path name. The underlying `problem' is that
1388  CRAB does not overwrite existing output files so if the output
1389  file already exists CRAB will fail to copy back the output.
1390 
1391  NOTE: It's not possible to create different kinds of
1392  harvesting jobs in a single call to this tool. However, in
1393  principle it could be possible to create both data and MC jobs
1394  in a single go.
1395 
1396  NOTE: The number of events used in the path name is the
1397  _total_ number of events in the dataset/run at the time of
1398  harvesting. If we're doing partial harvesting the final
1399  results will reflect lower statistics. This is a) the easiest
1400  to code and b) the least likely to lead to confusion if
1401  someone ever decides to swap/copy around file blocks between
1402  sites.
1403 
1404  """
1405 
1406  castor_path = castor_path_common
1407 
1408  ###
1409 
1410  # The run number part.
1411  castor_path = os.path.join(castor_path, "run_%d" % run_number)
1412 
1413  ###
1414 
1415  # The event count (i.e. the number of events we currently see
1416  # for this dataset).
1417  #nevents = self.datasets_information[dataset_name] \
1418  # ["num_events"][run_number]
1419  castor_path = os.path.join(castor_path, "nevents")
1420 
1421  ###
1422 
1423  castor_path = os.path.normpath(castor_path)
1424 
1425  # End of create_castor_path_name_special.
1426  return castor_path
1427 
1428  ##########
1429 
1431  """Make sure all required CASTOR output dirs exist.
1432 
1433  This checks the CASTOR base dir specified by the user as well
1434  as all the subdirs required by the current set of jobs.
1435 
1436  """
1437 
1438  self.logger.info("Checking (and if necessary creating) CASTOR " \
1439  "output area(s)...")
1440 
1441  # Call the real checker method for the base dir.
1442  self.create_and_check_castor_dir(self.castor_base_dir)
1443 
1444  # Now call the checker for all (unique) subdirs.
1445  castor_dirs = []
1446  for (dataset_name, runs) in six.iteritems(self.datasets_to_use):
1447 
1448  for run in runs:
1449  castor_dirs.append(self.datasets_information[dataset_name] \
1450  ["castor_path"][run])
1451  castor_dirs_unique = sorted(set(castor_dirs))
1452  # This can take some time. E.g. CRAFT08 has > 300 runs, each
1453  # of which will get a new directory. So we show some (rough)
1454  # info in between.
1455  ndirs = len(castor_dirs_unique)
1456  step = max(ndirs / 10, 1)
1457  for (i, castor_dir) in enumerate(castor_dirs_unique):
1458  if (i + 1) % step == 0 or \
1459  (i + 1) == ndirs:
1460  self.logger.info(" %d/%d" % \
1461  (i + 1, ndirs))
1462  self.create_and_check_castor_dir(castor_dir)
1463 
1464  # Now check if the directory is empty. If (an old version
1465  # of) the output file already exists CRAB will run new
1466  # jobs but never copy the results back. We assume the user
1467  # knows what they are doing and only issue a warning in
1468  # case the directory is not empty.
1469  self.logger.debug("Checking if path `%s' is empty" % \
1470  castor_dir)
1471  cmd = "rfdir %s" % castor_dir
1472  (status, output) = commands.getstatusoutput(cmd)
1473  if status != 0:
1474  msg = "Could not access directory `%s'" \
1475  " !!! This is bad since I should have just" \
1476  " created it !!!" % castor_dir
1477  self.logger.fatal(msg)
1478  raise Error(msg)
1479  if len(output) > 0:
1480  self.logger.warning("Output directory `%s' is not empty:" \
1481  " new jobs will fail to" \
1482  " copy back output" % \
1483  castor_dir)
1484 
1485  # End of create_and_check_castor_dirs.
1486 
1487  ##########
1488 
1489  def create_and_check_castor_dir(self, castor_dir):
1490  """Check existence of the give CASTOR dir, if necessary create
1491  it.
1492 
1493  Some special care has to be taken with several things like
1494  setting the correct permissions such that CRAB can store the
1495  output results. Of course this means that things like
1496  /castor/cern.ch/ and user/j/ have to be recognised and treated
1497  properly.
1498 
1499  NOTE: Only CERN CASTOR area (/castor/cern.ch/) supported for
1500  the moment.
1501 
1502  NOTE: This method uses some slightly tricky caching to make
1503  sure we don't keep over and over checking the same base paths.
1504 
1505  """
1506 
1507  ###
1508 
1509  # Local helper function to fully split a path into pieces.
1510  def split_completely(path):
1511  (parent_path, name) = os.path.split(path)
1512  if name == "":
1513  return (parent_path, )
1514  else:
1515  return split_completely(parent_path) + (name, )
1516 
1517  ###
1518 
1519  # Local helper function to check rfio (i.e. CASTOR)
1520  # directories.
1521  def extract_permissions(rfstat_output):
1522  """Parse the output from rfstat and return the
1523  5-digit permissions string."""
1524 
1525  permissions_line = [i for i in output.split("\n") \
1526  if i.lower().find("protection") > -1]
1527  regexp = re.compile(".*\(([0123456789]{5})\).*")
1528  match = regexp.search(rfstat_output)
1529  if not match or len(match.groups()) != 1:
1530  msg = "Could not extract permissions " \
1531  "from output: %s" % rfstat_output
1532  self.logger.fatal(msg)
1533  raise Error(msg)
1534  permissions = match.group(1)
1535 
1536  # End of extract_permissions.
1537  return permissions
1538 
1539  ###
1540 
1541  # These are the pieces of CASTOR directories that we do not
1542  # want to touch when modifying permissions.
1543 
1544  # NOTE: This is all a bit involved, basically driven by the
1545  # fact that one wants to treat the `j' directory of
1546  # `/castor/cern.ch/user/j/jhegeman/' specially.
1547  # BUG BUG BUG
1548  # This should be simplified, for example by comparing to the
1549  # CASTOR prefix or something like that.
1550  # BUG BUG BUG end
1551  castor_paths_dont_touch = {
1552  0: ["/", "castor", "cern.ch", "cms", "store", "temp",
1553  "dqm", "offline", "user"],
1554  -1: ["user", "store"]
1555  }
1556 
1557  self.logger.debug("Checking CASTOR path `%s'" % castor_dir)
1558 
1559  ###
1560 
1561  # First we take the full CASTOR path apart.
1562  castor_path_pieces = split_completely(castor_dir)
1563 
1564  # Now slowly rebuild the CASTOR path and see if a) all
1565  # permissions are set correctly and b) the final destination
1566  # exists.
1567  path = ""
1568  check_sizes = sorted(castor_paths_dont_touch.keys())
1569  len_castor_path_pieces = len(castor_path_pieces)
1570  for piece_index in xrange (len_castor_path_pieces):
1571  skip_this_path_piece = False
1572  piece = castor_path_pieces[piece_index]
1573 ## self.logger.debug("Checking CASTOR path piece `%s'" % \
1574 ## piece)
1575  for check_size in check_sizes:
1576  # Do we need to do anything with this?
1577  if (piece_index + check_size) > -1:
1578 ## self.logger.debug("Checking `%s' against `%s'" % \
1579 ## (castor_path_pieces[piece_index + check_size],
1580 ## castor_paths_dont_touch[check_size]))
1581  if castor_path_pieces[piece_index + check_size] in castor_paths_dont_touch[check_size]:
1582 ## self.logger.debug(" skipping")
1583  skip_this_path_piece = True
1584 ## else:
1585 ## # Piece not in the list, fine.
1586 ## self.logger.debug(" accepting")
1587  # Add piece to the path we're building.
1588 ## self.logger.debug("!!! Skip path piece `%s'? %s" % \
1589 ## (piece, str(skip_this_path_piece)))
1590 ## self.logger.debug("Adding piece to path...")
1591  path = os.path.join(path, piece)
1592 ## self.logger.debug("Path is now `%s'" % \
1593 ## path)
1594 
1595  # Hmmmm, only at this point can we do some caching. Not
1596  # ideal, but okay.
1597  try:
1598  if path in self.castor_path_checks_cache:
1599  continue
1600  except AttributeError:
1601  # This only happens the first time around.
1602  self.castor_path_checks_cache = []
1603  self.castor_path_checks_cache.append(path)
1604 
1605  # Now, unless we're supposed to skip this piece of the
1606  # path, let's make sure it exists and set the permissions
1607  # correctly for use by CRAB. This means that:
1608  # - the final output directory should (at least) have
1609  # permissions 775
1610  # - all directories above that should (at least) have
1611  # permissions 755.
1612 
1613  # BUT: Even though the above permissions are the usual
1614  # ones to used when setting up CASTOR areas for grid job
1615  # output, there is one caveat in case multiple people are
1616  # working in the same CASTOR area. If user X creates
1617  # /a/b/c/ and user Y wants to create /a/b/d/ he/she does
1618  # not have sufficient rights. So: we set all dir
1619  # permissions to 775 to avoid this.
1620 
1621  if not skip_this_path_piece:
1622 
1623  # Ok, first thing: let's make sure this directory
1624  # exists.
1625  # NOTE: The nice complication is of course that the
1626  # usual os.path.isdir() etc. methods don't work for an
1627  # rfio filesystem. So we call rfstat and interpret an
1628  # error as meaning that the path does not exist.
1629  self.logger.debug("Checking if path `%s' exists" % \
1630  path)
1631  cmd = "rfstat %s" % path
1632  (status, output) = commands.getstatusoutput(cmd)
1633  if status != 0:
1634  # Path does not exist, let's try and create it.
1635  self.logger.debug("Creating path `%s'" % path)
1636  cmd = "nsmkdir -m 775 %s" % path
1637  (status, output) = commands.getstatusoutput(cmd)
1638  if status != 0:
1639  msg = "Could not create directory `%s'" % path
1640  self.logger.fatal(msg)
1641  raise Error(msg)
1642  cmd = "rfstat %s" % path
1643  (status, output) = commands.getstatusoutput(cmd)
1644  # Now check that it looks like a directory. If I'm not
1645  # mistaken one can deduce this from the fact that the
1646  # (octal) permissions string starts with `40' (instead
1647  # of `100').
1648  permissions = extract_permissions(output)
1649  if not permissions.startswith("40"):
1650  msg = "Path `%s' is not a directory(?)" % path
1651  self.logger.fatal(msg)
1652  raise Error(msg)
1653 
1654  # Figure out the current permissions for this
1655  # (partial) path.
1656  self.logger.debug("Checking permissions for path `%s'" % path)
1657  cmd = "rfstat %s" % path
1658  (status, output) = commands.getstatusoutput(cmd)
1659  if status != 0:
1660  msg = "Could not obtain permissions for directory `%s'" % \
1661  path
1662  self.logger.fatal(msg)
1663  raise Error(msg)
1664  # Take the last three digits of the permissions.
1665  permissions = extract_permissions(output)[-3:]
1666 
1667  # Now if necessary fix permissions.
1668  # NOTE: Be careful never to `downgrade' permissions.
1669  if piece_index == (len_castor_path_pieces - 1):
1670  # This means we're looking at the final
1671  # destination directory.
1672  permissions_target = "775"
1673  else:
1674  # `Only' an intermediate directory.
1675  permissions_target = "775"
1676 
1677  # Compare permissions.
1678  permissions_new = []
1679  for (i, j) in zip(permissions, permissions_target):
1680  permissions_new.append(str(max(int(i), int(j))))
1681  permissions_new = "".join(permissions_new)
1682  self.logger.debug(" current permissions: %s" % \
1683  permissions)
1684  self.logger.debug(" target permissions : %s" % \
1685  permissions_target)
1686  if permissions_new != permissions:
1687  # We have to modify the permissions.
1688  self.logger.debug("Changing permissions of `%s' " \
1689  "to %s (were %s)" % \
1690  (path, permissions_new, permissions))
1691  cmd = "rfchmod %s %s" % (permissions_new, path)
1692  (status, output) = commands.getstatusoutput(cmd)
1693  if status != 0:
1694  msg = "Could not change permissions for path `%s' " \
1695  "to %s" % (path, permissions_new)
1696  self.logger.fatal(msg)
1697  raise Error(msg)
1698 
1699  self.logger.debug(" Permissions ok (%s)" % permissions_new)
1700 
1701  # End of create_and_check_castor_dir.
1702 
1703  ##########
1704 
1705  def pick_a_site(self, sites, cmssw_version):
1706 
1707  # Create list of forbidden sites
1708  sites_forbidden = []
1709 
1710  if (self.preferred_site == "CAF") or (self.preferred_site == "caf.cern.ch"):
1711  self.caf_access = True
1712 
1713  if self.caf_access == False:
1714  sites_forbidden.append("caf.cern.ch")
1715 
1716  # These are the T1 sites. These are only forbidden if we're
1717  # running in non-T1 mode.
1718  # Source:
1719  # https://cmsweb.cern.ch/sitedb/sitelist/?naming_scheme=ce
1720  # Hard-coded, yes. Not nice, no.
1721 
1722  all_t1 = [
1723  "srm-cms.cern.ch",
1724  "ccsrm.in2p3.fr",
1725  "cmssrm-fzk.gridka.de",
1726  "cmssrm.fnal.gov",
1727  "gridka-dCache.fzk.de",
1728  "srm-cms.gridpp.rl.ac.uk",
1729  "srm.grid.sinica.edu.tw",
1730  "srm2.grid.sinica.edu.tw",
1731  "srmcms.pic.es",
1732  "storm-fe-cms.cr.cnaf.infn.it"
1733  ]
1734 
1735  country_codes = {
1736  "CAF" : "caf.cern.ch",
1737  "CH" : "srm-cms.cern.ch",
1738  "FR" : "ccsrm.in2p3.fr",
1739  "DE" : "cmssrm-fzk.gridka.de",
1740  "GOV" : "cmssrm.fnal.gov",
1741  "DE2" : "gridka-dCache.fzk.de",
1742  "UK" : "srm-cms.gridpp.rl.ac.uk",
1743  "TW" : "srm.grid.sinica.edu.tw",
1744  "TW2" : "srm2.grid.sinica.edu.tw",
1745  "ES" : "srmcms.pic.es",
1746  "IT" : "storm-fe-cms.cr.cnaf.infn.it"
1747  }
1748 
1749  if self.non_t1access:
1750  sites_forbidden.extend(all_t1)
1751 
1752  for site in sites_forbidden:
1753  if site in sites:
1754  sites.remove(site)
1755 
1756  if self.preferred_site in country_codes:
1757  self.preferred_site = country_codes[self.preferred_site]
1758 
1759  if self.preferred_site != "no preference":
1760  if self.preferred_site in sites:
1761  sites = [self.preferred_site]
1762  else:
1763  sites= []
1764 
1765  #print sites
1766 
1767  # Looks like we have to do some caching here, otherwise things
1768  # become waaaay toooo sloooooow. So that's what the
1769  # sites_and_versions_cache does.
1770 
1771  # NOTE: Keep this set to None!
1772  site_name = None
1773  cmd = None
1774  while len(sites) > 0 and \
1775  site_name is None:
1776 
1777  # Create list of t1_sites
1778  t1_sites = []
1779  for site in sites:
1780  if site in all_t1:
1781  t1_sites.append(site)
1782  if site == "caf.cern.ch":
1783  t1_sites.append(site)
1784 
1785  # If avilable pick preferred site
1786  #if self.preferred_site in sites:
1787  # se_name = self.preferred_site
1788  # Else, if available pick t1 site
1789 
1790  if len(t1_sites) > 0:
1791  se_name = choice(t1_sites)
1792  # Else pick any site
1793  else:
1794  se_name = choice(sites)
1795 
1796  # But check that it hosts the CMSSW version we want.
1797 
1798  if se_name in self.sites_and_versions_cache and \
1799  cmssw_version in self.sites_and_versions_cache[se_name]:
1800  if self.sites_and_versions_cache[se_name][cmssw_version]:
1801  site_name = se_name
1802  break
1803  else:
1804  self.logger.debug(" --> rejecting site `%s'" % se_name)
1805  sites.remove(se_name)
1806 
1807  else:
1808  self.logger.info("Checking if site `%s' " \
1809  "has CMSSW version `%s'" % \
1810  (se_name, cmssw_version))
1811  self.sites_and_versions_cache[se_name] = {}
1812 
1813  # TODO TODO TODO
1814  # Test for SCRAM architecture removed as per request
1815  # from Andreas.
1816  # scram_arch = os.getenv("SCRAM_ARCH")
1817  # cmd = "lcg-info --list-ce " \
1818  # "--query '" \
1819  # "Tag=VO-cms-%s," \
1820  # "Tag=VO-cms-%s," \
1821  # "CEStatus=Production," \
1822  # "CloseSE=%s'" % \
1823  # (cmssw_version, scram_arch, se_name)
1824  # TODO TODO TODO end
1825 
1826  cmd = "lcg-info --list-ce " \
1827  "--query '" \
1828  "Tag=VO-cms-%s," \
1829  "CEStatus=Production," \
1830  "CloseSE=%s'" % \
1831  (cmssw_version, se_name)
1832  (status, output) = commands.getstatusoutput(cmd)
1833  if status != 0:
1834  self.logger.error("Could not check site information " \
1835  "for site `%s'" % se_name)
1836  else:
1837  if (len(output) > 0) or (se_name == "caf.cern.ch"):
1838  self.sites_and_versions_cache[se_name][cmssw_version] = True
1839  site_name = se_name
1840  break
1841  else:
1842  self.sites_and_versions_cache[se_name][cmssw_version] = False
1843  self.logger.debug(" --> rejecting site `%s'" % se_name)
1844  sites.remove(se_name)
1845 
1846  if site_name is self.no_matching_site_found_str:
1847  self.logger.error(" --> no matching site found")
1848  self.logger.error(" --> Your release or SCRAM " \
1849  "architecture may not be available" \
1850  "anywhere on the (LCG) grid.")
1851  if not cmd is None:
1852  self.logger.debug(" (command used: `%s')" % cmd)
1853  else:
1854  self.logger.debug(" --> selected site `%s'" % site_name)
1855 
1856  # Return something more descriptive (than `None') in case we
1857  # found nothing.
1858  if site_name is None:
1859  site_name = self.no_matching_site_found_str
1860  # Keep track of our global flag signifying that this
1861  # happened.
1862  self.all_sites_found = False
1863 
1864  # End of pick_a_site.
1865  return site_name
1866 
1867  ##########
1868 
1870 
1871  # Set up the command line parser. Note that we fix up the help
1872  # formatter so that we can add some text pointing people to
1873  # the Twiki etc.
1874  parser = optparse.OptionParser(version="%s %s" % \
1875  ("%prog", self.version),
1876  formatter=CMSHarvesterHelpFormatter())
1877 
1878  self.option_parser = parser
1879 
1880  # The debug switch.
1881  parser.add_option("-d", "--debug",
1882  help="Switch on debug mode",
1883  action="callback",
1884  callback=self.option_handler_debug)
1885 
1886  # The quiet switch.
1887  parser.add_option("-q", "--quiet",
1888  help="Be less verbose",
1889  action="callback",
1890  callback=self.option_handler_quiet)
1891 
1892  # The force switch. If this switch is used sanity checks are
1893  # performed but failures do not lead to aborts. Use with care.
1894  parser.add_option("", "--force",
1895  help="Force mode. Do not abort on sanity check "
1896  "failures",
1897  action="callback",
1898  callback=self.option_handler_force)
1899 
1900  # Choose between the different kinds of harvesting.
1901  parser.add_option("", "--harvesting_type",
1902  help="Harvesting type: %s" % \
1903  ", ".join(self.harvesting_types),
1904  action="callback",
1905  callback=self.option_handler_harvesting_type,
1906  type="string",
1907  metavar="HARVESTING_TYPE")
1908 
1909  # Choose between single-step and two-step mode.
1910  parser.add_option("", "--harvesting_mode",
1911  help="Harvesting mode: %s (default = %s)" % \
1912  (", ".join(self.harvesting_modes),
1913  self.harvesting_mode_default),
1914  action="callback",
1915  callback=self.option_handler_harvesting_mode,
1916  type="string",
1917  metavar="HARVESTING_MODE")
1918 
1919  # Override the GlobalTag chosen by the cmsHarvester.
1920  parser.add_option("", "--globaltag",
1921  help="GlobalTag to use. Default is the ones " \
1922  "the dataset was created with for MC, for data" \
1923  "a GlobalTag has to be specified.",
1924  action="callback",
1925  callback=self.option_handler_globaltag,
1926  type="string",
1927  metavar="GLOBALTAG")
1928 
1929  # Allow switching off of reference histograms.
1930  parser.add_option("", "--no-ref-hists",
1931  help="Don't use any reference histograms",
1932  action="callback",
1933  callback=self.option_handler_no_ref_hists)
1934 
1935  # Allow the default (i.e. the one that should be used)
1936  # Frontier connection to be overridden.
1937  parser.add_option("", "--frontier-connection",
1938  help="Use this Frontier connection to find " \
1939  "GlobalTags and LocalTags (for reference " \
1940  "histograms).\nPlease only use this for " \
1941  "testing.",
1942  action="callback",
1943  callback=self.option_handler_frontier_connection,
1944  type="string",
1945  metavar="FRONTIER")
1946 
1947  # Similar to the above but specific to the Frontier connection
1948  # to be used for the GlobalTag.
1949  parser.add_option("", "--frontier-connection-for-globaltag",
1950  help="Use this Frontier connection to find " \
1951  "GlobalTags.\nPlease only use this for " \
1952  "testing.",
1953  action="callback",
1954  callback=self.option_handler_frontier_connection,
1955  type="string",
1956  metavar="FRONTIER")
1957 
1958  # Similar to the above but specific to the Frontier connection
1959  # to be used for the reference histograms.
1960  parser.add_option("", "--frontier-connection-for-refhists",
1961  help="Use this Frontier connection to find " \
1962  "LocalTags (for reference " \
1963  "histograms).\nPlease only use this for " \
1964  "testing.",
1965  action="callback",
1966  callback=self.option_handler_frontier_connection,
1967  type="string",
1968  metavar="FRONTIER")
1969 
1970  # Option to specify the name (or a regexp) of the dataset(s)
1971  # to be used.
1972  parser.add_option("", "--dataset",
1973  help="Name (or regexp) of dataset(s) to process",
1974  action="callback",
1975  #callback=self.option_handler_dataset_name,
1976  callback=self.option_handler_input_spec,
1977  type="string",
1978  #dest="self.input_name",
1979  metavar="DATASET")
1980 
1981  # Option to specify the name (or a regexp) of the dataset(s)
1982  # to be ignored.
1983  parser.add_option("", "--dataset-ignore",
1984  help="Name (or regexp) of dataset(s) to ignore",
1985  action="callback",
1986  callback=self.option_handler_input_spec,
1987  type="string",
1988  metavar="DATASET-IGNORE")
1989 
1990  # Option to specify the name (or a regexp) of the run(s)
1991  # to be used.
1992  parser.add_option("", "--runs",
1993  help="Run number(s) to process",
1994  action="callback",
1995  callback=self.option_handler_input_spec,
1996  type="string",
1997  metavar="RUNS")
1998 
1999  # Option to specify the name (or a regexp) of the run(s)
2000  # to be ignored.
2001  parser.add_option("", "--runs-ignore",
2002  help="Run number(s) to ignore",
2003  action="callback",
2004  callback=self.option_handler_input_spec,
2005  type="string",
2006  metavar="RUNS-IGNORE")
2007 
2008  # Option to specify a file containing a list of dataset names
2009  # (or regexps) to be used.
2010  parser.add_option("", "--datasetfile",
2011  help="File containing list of dataset names " \
2012  "(or regexps) to process",
2013  action="callback",
2014  #callback=self.option_handler_listfile_name,
2015  callback=self.option_handler_input_spec,
2016  type="string",
2017  #dest="self.input_name",
2018  metavar="DATASETFILE")
2019 
2020  # Option to specify a file containing a list of dataset names
2021  # (or regexps) to be ignored.
2022  parser.add_option("", "--datasetfile-ignore",
2023  help="File containing list of dataset names " \
2024  "(or regexps) to ignore",
2025  action="callback",
2026  callback=self.option_handler_input_spec,
2027  type="string",
2028  metavar="DATASETFILE-IGNORE")
2029 
2030  # Option to specify a file containing a list of runs to be
2031  # used.
2032  parser.add_option("", "--runslistfile",
2033  help="File containing list of run numbers " \
2034  "to process",
2035  action="callback",
2036  callback=self.option_handler_input_spec,
2037  type="string",
2038  metavar="RUNSLISTFILE")
2039 
2040  # Option to specify a file containing a list of runs
2041  # to be ignored.
2042  parser.add_option("", "--runslistfile-ignore",
2043  help="File containing list of run numbers " \
2044  "to ignore",
2045  action="callback",
2046  callback=self.option_handler_input_spec,
2047  type="string",
2048  metavar="RUNSLISTFILE-IGNORE")
2049 
2050  # Option to specify a Jsonfile contaning a list of runs
2051  # to be used.
2052  parser.add_option("", "--Jsonrunfile",
2053  help="Jsonfile containing dictionary of run/lumisections pairs. " \
2054  "All lumisections of runs contained in dictionary are processed.",
2055  action="callback",
2056  callback=self.option_handler_input_Jsonrunfile,
2057  type="string",
2058  metavar="JSONRUNFILE")
2059 
2060  # Option to specify a Jsonfile contaning a dictionary of run/lumisections pairs
2061  # to be used.
2062  parser.add_option("", "--Jsonfile",
2063  help="Jsonfile containing dictionary of run/lumisections pairs. " \
2064  "Only specified lumisections of runs contained in dictionary are processed.",
2065  action="callback",
2066  callback=self.option_handler_input_Jsonfile,
2067  type="string",
2068  metavar="JSONFILE")
2069 
2070  # Option to specify a ToDo file contaning a list of runs
2071  # to be used.
2072  parser.add_option("", "--todo-file",
2073  help="Todo file containing a list of runs to process.",
2074  action="callback",
2075  callback=self.option_handler_input_todofile,
2076  type="string",
2077  metavar="TODO-FILE")
2078 
2079  # Option to specify which file to use for the dataset name to
2080  # reference histogram name mappings.
2081  parser.add_option("", "--refhistmappingfile",
2082  help="File to be use for the reference " \
2083  "histogram mappings. Default: `%s'." % \
2084  self.ref_hist_mappings_file_name_default,
2085  action="callback",
2086  callback=self.option_handler_ref_hist_mapping_file,
2087  type="string",
2088  metavar="REFHISTMAPPING-FILE")
2089 
2090  # Specify the place in CASTOR where the output should go.
2091  # NOTE: Only output to CASTOR is supported for the moment,
2092  # since the central DQM results place is on CASTOR anyway.
2093  parser.add_option("", "--castordir",
2094  help="Place on CASTOR to store results. " \
2095  "Default: `%s'." % \
2096  self.castor_base_dir_default,
2097  action="callback",
2098  callback=self.option_handler_castor_dir,
2099  type="string",
2100  metavar="CASTORDIR")
2101 
2102  # Use this to try and create jobs that will run without
2103  # special `t1access' role.
2104  parser.add_option("", "--no-t1access",
2105  help="Try to create jobs that will run " \
2106  "without special `t1access' role",
2107  action="callback",
2108  callback=self.option_handler_no_t1access)
2109 
2110  # Use this to create jobs that may run on CAF
2111  parser.add_option("", "--caf-access",
2112  help="Crab jobs may run " \
2113  "on CAF",
2114  action="callback",
2115  callback=self.option_handler_caf_access)
2116 
2117  # set process.dqmSaver.saveByLumiSection=1 in harvesting cfg file
2118  parser.add_option("", "--saveByLumiSection",
2119  help="set saveByLumiSection=1 in harvesting cfg file",
2120  action="callback",
2121  callback=self.option_handler_saveByLumiSection)
2122 
2123  # Use this to enable automatic creation and submission of crab jobs
2124  parser.add_option("", "--automatic-crab-submission",
2125  help="Crab jobs are created and " \
2126  "submitted automatically",
2127  action="callback",
2128  callback=self.option_handler_crab_submission)
2129 
2130  # Option to set the max number of sites, each
2131  #job is submitted to
2132  parser.add_option("", "--max-sites",
2133  help="Max. number of sites each job is submitted to",
2134  action="callback",
2135  callback=self.option_handler_sites,
2136  type="int")
2137 
2138  # Option to set the preferred site
2139  parser.add_option("", "--site",
2140  help="Crab jobs are submitted to specified site. T1 sites may be shortened by the following (country) codes: \
2141  srm-cms.cern.ch : CH \
2142  ccsrm.in2p3.fr : FR \
2143  cmssrm-fzk.gridka.de : DE \
2144  cmssrm.fnal.gov : GOV \
2145  gridka-dCache.fzk.de : DE2 \
2146  rm-cms.gridpp.rl.ac.uk : UK \
2147  srm.grid.sinica.edu.tw : TW \
2148  srm2.grid.sinica.edu.tw : TW2 \
2149  srmcms.pic.es : ES \
2150  storm-fe-cms.cr.cnaf.infn.it : IT",
2151  action="callback",
2152  callback=self.option_handler_preferred_site,
2153  type="string")
2154 
2155  # This is the command line flag to list all harvesting
2156  # type-to-sequence mappings.
2157  parser.add_option("-l", "--list",
2158  help="List all harvesting types and their" \
2159  "corresponding sequence names",
2160  action="callback",
2161  callback=self.option_handler_list_types)
2162 
2163  # If nothing was specified: tell the user how to do things the
2164  # next time and exit.
2165  # NOTE: We just use the OptParse standard way of doing this by
2166  # acting as if a '--help' was specified.
2167  if len(self.cmd_line_opts) < 1:
2168  self.cmd_line_opts = ["--help"]
2169 
2170  # Some trickery with the options. Why? Well, since these
2171  # options change the output level immediately from the option
2172  # handlers, the results differ depending on where they are on
2173  # the command line. Let's just make sure they are at the
2174  # front.
2175  # NOTE: Not very efficient or sophisticated, but it works and
2176  # it only has to be done once anyway.
2177  for i in ["-d", "--debug",
2178  "-q", "--quiet"]:
2179  if i in self.cmd_line_opts:
2180  self.cmd_line_opts.remove(i)
2181  self.cmd_line_opts.insert(0, i)
2182 
2183  # Everything is set up, now parse what we were given.
2184  parser.set_defaults()
2185  (self.options, self.args) = parser.parse_args(self.cmd_line_opts)
2186 
2187  # End of parse_cmd_line_options.
2188 
2189  ##########
2190 
2192  """Check completeness and correctness of input information.
2193 
2194  Check that all required information has been specified and
2195  that, at least as far as can be easily checked, it makes
2196  sense.
2197 
2198  NOTE: This is also where any default values are applied.
2199 
2200  """
2201 
2202  self.logger.info("Checking completeness/correctness of input...")
2203 
2204  # The cmsHarvester does not take (i.e. understand) any
2205  # arguments so there should not be any.
2206  if len(self.args) > 0:
2207  msg = "Sorry but I don't understand `%s'" % \
2208  (" ".join(self.args))
2209  self.logger.fatal(msg)
2210  raise Usage(msg)
2211 
2212  # BUG BUG BUG
2213  # While we wait for some bugs left and right to get fixed, we
2214  # disable two-step.
2215  if self.harvesting_mode == "two-step":
2216  msg = "--------------------\n" \
2217  " Sorry, but for the moment (well, till it works)" \
2218  " the two-step mode has been disabled.\n" \
2219  "--------------------\n"
2220  self.logger.fatal(msg)
2221  raise Error(msg)
2222  # BUG BUG BUG end
2223 
2224  # We need a harvesting method to be specified
2225  if self.harvesting_type is None:
2226  msg = "Please specify a harvesting type"
2227  self.logger.fatal(msg)
2228  raise Usage(msg)
2229  # as well as a harvesting mode.
2230  if self.harvesting_mode is None:
2231  self.harvesting_mode = self.harvesting_mode_default
2232  msg = "No harvesting mode specified --> using default `%s'" % \
2233  self.harvesting_mode
2234  self.logger.warning(msg)
2235  #raise Usage(msg)
2236 
2237  ###
2238 
2239  # We need an input method so we can find the dataset name(s).
2240  if self.input_method["datasets"]["use"] is None:
2241  msg = "Please specify an input dataset name " \
2242  "or a list file name"
2243  self.logger.fatal(msg)
2244  raise Usage(msg)
2245 
2246  # DEBUG DEBUG DEBUG
2247  # If we get here, we should also have an input name.
2248  assert not self.input_name["datasets"]["use"] is None
2249  # DEBUG DEBUG DEBUG end
2250 
2251  ###
2252 
2253  # The same holds for the reference histogram mapping file (if
2254  # we're using references).
2255  if self.use_ref_hists:
2256  if self.ref_hist_mappings_file_name is None:
2257  self.ref_hist_mappings_file_name = self.ref_hist_mappings_file_name_default
2258  msg = "No reference histogram mapping file specified --> " \
2259  "using default `%s'" % \
2260  self.ref_hist_mappings_file_name
2261  self.logger.warning(msg)
2262 
2263  ###
2264 
2265  # We need to know where to put the stuff (okay, the results)
2266  # on CASTOR.
2267  if self.castor_base_dir is None:
2268  self.castor_base_dir = self.castor_base_dir_default
2269  msg = "No CASTOR area specified -> using default `%s'" % \
2270  self.castor_base_dir
2271  self.logger.warning(msg)
2272  #raise Usage(msg)
2273 
2274  # Only the CERN CASTOR area is supported.
2275  if not self.castor_base_dir.startswith(self.castor_prefix):
2276  msg = "CASTOR area does not start with `%s'" % \
2277  self.castor_prefix
2278  self.logger.fatal(msg)
2279  if self.castor_base_dir.find("castor") > -1 and \
2280  not self.castor_base_dir.find("cern.ch") > -1:
2281  self.logger.fatal("Only CERN CASTOR is supported")
2282  raise Usage(msg)
2283 
2284  ###
2285 
2286  # TODO TODO TODO
2287  # This should be removed in the future, once I find out how to
2288  # get the config file used to create a given dataset from DBS.
2289 
2290  # For data we need to have a GlobalTag. (For MC we can figure
2291  # it out by ourselves.)
2292  if self.globaltag is None:
2293  self.logger.warning("No GlobalTag specified. This means I cannot")
2294  self.logger.warning("run on data, only on MC.")
2295  self.logger.warning("I will skip all data datasets.")
2296 
2297  # TODO TODO TODO end
2298 
2299  # Make sure the GlobalTag ends with `::All'.
2300  if not self.globaltag is None:
2301  if not self.globaltag.endswith("::All"):
2302  self.logger.warning("Specified GlobalTag `%s' does " \
2303  "not end in `::All' --> " \
2304  "appending this missing piece" % \
2305  self.globaltag)
2306  self.globaltag = "%s::All" % self.globaltag
2307 
2308  ###
2309 
2310  # Dump some info about the Frontier connections used.
2311  for (key, value) in six.iteritems(self.frontier_connection_name):
2312  frontier_type_str = "unknown"
2313  if key == "globaltag":
2314  frontier_type_str = "the GlobalTag"
2315  elif key == "refhists":
2316  frontier_type_str = "the reference histograms"
2317  non_str = None
2318  if self.frontier_connection_overridden[key] == True:
2319  non_str = "non-"
2320  else:
2321  non_str = ""
2322  self.logger.info("Using %sdefault Frontier " \
2323  "connection for %s: `%s'" % \
2324  (non_str, frontier_type_str, value))
2325 
2326  ###
2327 
2328  # End of check_input_status.
2329 
2330  ##########
2331 
2332  def check_cmssw(self):
2333  """Check if CMSSW is setup.
2334 
2335  """
2336 
2337  # Try to access the CMSSW_VERSION environment variable. If
2338  # it's something useful we consider CMSSW to be set up
2339  # properly. Otherwise we raise an error.
2340  cmssw_version = os.getenv("CMSSW_VERSION")
2341  if cmssw_version is None:
2342  self.logger.fatal("It seems CMSSW is not setup...")
2343  self.logger.fatal("($CMSSW_VERSION is empty)")
2344  raise Error("ERROR: CMSSW needs to be setup first!")
2345 
2346  self.cmssw_version = cmssw_version
2347  self.logger.info("Found CMSSW version %s properly set up" % \
2348  self.cmssw_version)
2349 
2350  # End of check_cmsssw.
2351  return True
2352 
2353  ##########
2354 
2355  def check_dbs(self):
2356  """Check if DBS is setup.
2357 
2358  """
2359 
2360  # Try to access the DBSCMD_HOME environment variable. If this
2361  # looks useful we consider DBS to be set up
2362  # properly. Otherwise we raise an error.
2363  dbs_home = os.getenv("DBSCMD_HOME")
2364  if dbs_home is None:
2365  self.logger.fatal("It seems DBS is not setup...")
2366  self.logger.fatal(" $DBSCMD_HOME is empty")
2367  raise Error("ERROR: DBS needs to be setup first!")
2368 
2369 ## # Now we try to do a very simple DBS search. If that works
2370 ## # instead of giving us the `Unsupported API call' crap, we
2371 ## # should be good to go.
2372 ## # NOTE: Not ideal, I know, but it reduces the amount of
2373 ## # complaints I get...
2374 ## cmd = "dbs search --query=\"find dataset where dataset = impossible\""
2375 ## (status, output) = commands.getstatusoutput(cmd)
2376 ## pdb.set_trace()
2377 ## if status != 0 or \
2378 ## output.lower().find("unsupported api call") > -1:
2379 ## self.logger.fatal("It seems DBS is not setup...")
2380 ## self.logger.fatal(" %s returns crap:" % cmd)
2381 ## for line in output.split("\n"):
2382 ## self.logger.fatal(" %s" % line)
2383 ## raise Error("ERROR: DBS needs to be setup first!")
2384 
2385  self.logger.debug("Found DBS properly set up")
2386 
2387  # End of check_dbs.
2388  return True
2389 
2390  ##########
2391 
2392  def setup_dbs(self):
2393  """Setup the Python side of DBS.
2394 
2395  For more information see the DBS Python API documentation:
2396  https://twiki.cern.ch/twiki/bin/view/CMS/DBSApiDocumentation
2397 
2398  """
2399 
2400  try:
2401  args={}
2402  args["url"]= "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/" \
2403  "servlet/DBSServlet"
2404  api = DbsApi(args)
2405  self.dbs_api = api
2406 
2407  except DBSAPI.dbsApiException.DbsApiException as ex:
2408  self.logger.fatal("Caught DBS API exception %s: %s " % \
2409  (ex.getClassName(), ex.getErrorMessage()))
2410  if ex.getErrorCode() not in (None, ""):
2411  logger.debug("DBS exception error code: ", ex.getErrorCode())
2412  raise
2413 
2414  # End of setup_dbs.
2415 
2416  ##########
2417 
2418  def dbs_resolve_dataset_name(self, dataset_name):
2419  """Use DBS to resolve a wildcarded dataset name.
2420 
2421  """
2422 
2423  # DEBUG DEBUG DEBUG
2424  # If we get here DBS should have been set up already.
2425  assert not self.dbs_api is None
2426  # DEBUG DEBUG DEBUG end
2427 
2428  # Some minor checking to make sure that whatever we've been
2429  # given as dataset name actually sounds like a dataset name.
2430  if not (dataset_name.startswith("/") and \
2431  dataset_name.endswith("RECO")):
2432  self.logger.warning("Dataset name `%s' does not sound " \
2433  "like a valid dataset name!" % \
2434  dataset_name)
2435 
2436  #----------
2437 
2438  api = self.dbs_api
2439  dbs_query = "find dataset where dataset like %s " \
2440  "and dataset.status = VALID" % \
2441  dataset_name
2442  try:
2443  api_result = api.executeQuery(dbs_query)
2444  except DBSAPI.dbsApiException.DbsApiException:
2445  msg = "ERROR: Could not execute DBS query"
2446  self.logger.fatal(msg)
2447  raise Error(msg)
2448 
2449  # Setup parsing.
2450  handler = DBSXMLHandler(["dataset"])
2451  parser = xml.sax.make_parser()
2452  parser.setContentHandler(handler)
2453 
2454  # Parse.
2455  try:
2456  xml.sax.parseString(api_result, handler)
2457  except SAXParseException:
2458  msg = "ERROR: Could not parse DBS server output"
2459  self.logger.fatal(msg)
2460  raise Error(msg)
2461 
2462  # DEBUG DEBUG DEBUG
2463  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2464  # DEBUG DEBUG DEBUG end
2465 
2466  # Extract the results.
2467  datasets = handler.results.values()[0]
2468 
2469  # End of dbs_resolve_dataset_name.
2470  return datasets
2471 
2472  ##########
2473 
2474  def dbs_resolve_cmssw_version(self, dataset_name):
2475  """Ask DBS for the CMSSW version used to create this dataset.
2476 
2477  """
2478 
2479  # DEBUG DEBUG DEBUG
2480  # If we get here DBS should have been set up already.
2481  assert not self.dbs_api is None
2482  # DEBUG DEBUG DEBUG end
2483 
2484  api = self.dbs_api
2485  dbs_query = "find algo.version where dataset = %s " \
2486  "and dataset.status = VALID" % \
2487  dataset_name
2488  try:
2489  api_result = api.executeQuery(dbs_query)
2490  except DBSAPI.dbsApiException.DbsApiException:
2491  msg = "ERROR: Could not execute DBS query"
2492  self.logger.fatal(msg)
2493  raise Error(msg)
2494 
2495  handler = DBSXMLHandler(["algo.version"])
2496  parser = xml.sax.make_parser()
2497  parser.setContentHandler(handler)
2498 
2499  try:
2500  xml.sax.parseString(api_result, handler)
2501  except SAXParseException:
2502  msg = "ERROR: Could not parse DBS server output"
2503  self.logger.fatal(msg)
2504  raise Error(msg)
2505 
2506  # DEBUG DEBUG DEBUG
2507  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2508  # DEBUG DEBUG DEBUG end
2509 
2510  cmssw_version = handler.results.values()[0]
2511 
2512  # DEBUG DEBUG DEBUG
2513  assert len(cmssw_version) == 1
2514  # DEBUG DEBUG DEBUG end
2515 
2516  cmssw_version = cmssw_version[0]
2517 
2518  # End of dbs_resolve_cmssw_version.
2519  return cmssw_version
2520 
2521  ##########
2522 
2523 ## def dbs_resolve_dataset_number_of_events(self, dataset_name):
2524 ## """Ask DBS across how many events this dataset has been spread
2525 ## out.
2526 
2527 ## This is especially useful to check that we do not submit a job
2528 ## supposed to run on a complete sample that is not contained at
2529 ## a single site.
2530 
2531 ## """
2532 
2533 ## # DEBUG DEBUG DEBUG
2534 ## # If we get here DBS should have been set up already.
2535 ## assert not self.dbs_api is None
2536 ## # DEBUG DEBUG DEBUG end
2537 
2538 ## api = self.dbs_api
2539 ## dbs_query = "find count(site) where dataset = %s " \
2540 ## "and dataset.status = VALID" % \
2541 ## dataset_name
2542 ## try:
2543 ## api_result = api.executeQuery(dbs_query)
2544 ## except DbsApiException:
2545 ## raise Error("ERROR: Could not execute DBS query")
2546 
2547 ## try:
2548 ## num_events = []
2549 ## class Handler(xml.sax.handler.ContentHandler):
2550 ## def startElement(self, name, attrs):
2551 ## if name == "result":
2552 ## num_events.append(str(attrs["COUNT_STORAGEELEMENT"]))
2553 ## xml.sax.parseString(api_result, Handler())
2554 ## except SAXParseException:
2555 ## raise Error("ERROR: Could not parse DBS server output")
2556 
2557 ## # DEBUG DEBUG DEBUG
2558 ## assert len(num_events) == 1
2559 ## # DEBUG DEBUG DEBUG end
2560 
2561 ## num_events = int(num_events[0])
2562 
2563 ## # End of dbs_resolve_dataset_number_of_events.
2564 ## return num_events
2565 
2566  ##########
2567 
2568  def dbs_resolve_runs(self, dataset_name):
2569  """Ask DBS for the list of runs in a given dataset.
2570 
2571  # NOTE: This does not (yet?) skip/remove empty runs. There is
2572  # a bug in the DBS entry run.numevents (i.e. it always returns
2573  # zero) which should be fixed in the `next DBS release'.
2574  # See also:
2575  # https://savannah.cern.ch/bugs/?53452
2576  # https://savannah.cern.ch/bugs/?53711
2577 
2578  """
2579 
2580  # TODO TODO TODO
2581  # We should remove empty runs as soon as the above mentioned
2582  # bug is fixed.
2583  # TODO TODO TODO end
2584 
2585  # DEBUG DEBUG DEBUG
2586  # If we get here DBS should have been set up already.
2587  assert not self.dbs_api is None
2588  # DEBUG DEBUG DEBUG end
2589 
2590  api = self.dbs_api
2591  dbs_query = "find run where dataset = %s " \
2592  "and dataset.status = VALID" % \
2593  dataset_name
2594  try:
2595  api_result = api.executeQuery(dbs_query)
2596  except DBSAPI.dbsApiException.DbsApiException:
2597  msg = "ERROR: Could not execute DBS query"
2598  self.logger.fatal(msg)
2599  raise Error(msg)
2600 
2601  handler = DBSXMLHandler(["run"])
2602  parser = xml.sax.make_parser()
2603  parser.setContentHandler(handler)
2604 
2605  try:
2606  xml.sax.parseString(api_result, handler)
2607  except SAXParseException:
2608  msg = "ERROR: Could not parse DBS server output"
2609  self.logger.fatal(msg)
2610  raise Error(msg)
2611 
2612  # DEBUG DEBUG DEBUG
2613  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2614  # DEBUG DEBUG DEBUG end
2615 
2616  runs = handler.results.values()[0]
2617  # Turn strings into integers.
2618  runs = sorted([int(i) for i in runs])
2619 
2620  # End of dbs_resolve_runs.
2621  return runs
2622 
2623  ##########
2624 
2625  def dbs_resolve_globaltag(self, dataset_name):
2626  """Ask DBS for the globaltag corresponding to a given dataset.
2627 
2628  # BUG BUG BUG
2629  # This does not seem to work for data datasets? E.g. for
2630  # /Cosmics/Commissioning08_CRAFT0831X_V1_311_ReReco_FromSuperPointing_v1/RAW-RECO
2631  # Probaly due to the fact that the GlobalTag changed during
2632  # datataking...
2633  BUG BUG BUG end
2634 
2635  """
2636 
2637  # DEBUG DEBUG DEBUG
2638  # If we get here DBS should have been set up already.
2639  assert not self.dbs_api is None
2640  # DEBUG DEBUG DEBUG end
2641 
2642  api = self.dbs_api
2643  dbs_query = "find dataset.tag where dataset = %s " \
2644  "and dataset.status = VALID" % \
2645  dataset_name
2646  try:
2647  api_result = api.executeQuery(dbs_query)
2648  except DBSAPI.dbsApiException.DbsApiException:
2649  msg = "ERROR: Could not execute DBS query"
2650  self.logger.fatal(msg)
2651  raise Error(msg)
2652 
2653  handler = DBSXMLHandler(["dataset.tag"])
2654  parser = xml.sax.make_parser()
2655  parser.setContentHandler(parser)
2656 
2657  try:
2658  xml.sax.parseString(api_result, handler)
2659  except SAXParseException:
2660  msg = "ERROR: Could not parse DBS server output"
2661  self.logger.fatal(msg)
2662  raise Error(msg)
2663 
2664  # DEBUG DEBUG DEBUG
2665  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2666  # DEBUG DEBUG DEBUG end
2667 
2668  globaltag = handler.results.values()[0]
2669 
2670  # DEBUG DEBUG DEBUG
2671  assert len(globaltag) == 1
2672  # DEBUG DEBUG DEBUG end
2673 
2674  globaltag = globaltag[0]
2675 
2676  # End of dbs_resolve_globaltag.
2677  return globaltag
2678 
2679  ##########
2680 
2681  def dbs_resolve_datatype(self, dataset_name):
2682  """Ask DBS for the the data type (data or mc) of a given
2683  dataset.
2684 
2685  """
2686 
2687  # DEBUG DEBUG DEBUG
2688  # If we get here DBS should have been set up already.
2689  assert not self.dbs_api is None
2690  # DEBUG DEBUG DEBUG end
2691 
2692  api = self.dbs_api
2693  dbs_query = "find datatype.type where dataset = %s " \
2694  "and dataset.status = VALID" % \
2695  dataset_name
2696  try:
2697  api_result = api.executeQuery(dbs_query)
2698  except DBSAPI.dbsApiException.DbsApiException:
2699  msg = "ERROR: Could not execute DBS query"
2700  self.logger.fatal(msg)
2701  raise Error(msg)
2702 
2703  handler = DBSXMLHandler(["datatype.type"])
2704  parser = xml.sax.make_parser()
2705  parser.setContentHandler(handler)
2706 
2707  try:
2708  xml.sax.parseString(api_result, handler)
2709  except SAXParseException:
2710  msg = "ERROR: Could not parse DBS server output"
2711  self.logger.fatal(msg)
2712  raise Error(msg)
2713 
2714  # DEBUG DEBUG DEBUG
2715  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2716  # DEBUG DEBUG DEBUG end
2717 
2718  datatype = handler.results.values()[0]
2719 
2720  # DEBUG DEBUG DEBUG
2721  assert len(datatype) == 1
2722  # DEBUG DEBUG DEBUG end
2723 
2724  datatype = datatype[0]
2725 
2726  # End of dbs_resolve_datatype.
2727  return datatype
2728 
2729  ##########
2730 
2731  # OBSOLETE OBSOLETE OBSOLETE
2732  # This method is no longer used.
2733 
2734  def dbs_resolve_number_of_events(self, dataset_name, run_number=None):
2735  """Determine the number of events in a given dataset (and run).
2736 
2737  Ask DBS for the number of events in a dataset. If a run number
2738  is specified the number of events returned is that in that run
2739  of that dataset. If problems occur we throw an exception.
2740 
2741  # BUG BUG BUG
2742  # Since DBS does not return the number of events correctly,
2743  # neither for runs nor for whole datasets, we have to work
2744  # around that a bit...
2745  # BUG BUG BUG end
2746 
2747  """
2748 
2749  # DEBUG DEBUG DEBUG
2750  # If we get here DBS should have been set up already.
2751  assert not self.dbs_api is None
2752  # DEBUG DEBUG DEBUG end
2753 
2754  api = self.dbs_api
2755  dbs_query = "find file.name, file.numevents where dataset = %s " \
2756  "and dataset.status = VALID" % \
2757  dataset_name
2758  if not run_number is None:
2759  dbs_query = dbq_query + (" and run = %d" % run_number)
2760  try:
2761  api_result = api.executeQuery(dbs_query)
2762  except DBSAPI.dbsApiException.DbsApiException:
2763  msg = "ERROR: Could not execute DBS query"
2764  self.logger.fatal(msg)
2765  raise Error(msg)
2766 
2767  handler = DBSXMLHandler(["file.name", "file.numevents"])
2768  parser = xml.sax.make_parser()
2769  parser.setContentHandler(handler)
2770 
2771  try:
2772  xml.sax.parseString(api_result, handler)
2773  except SAXParseException:
2774  msg = "ERROR: Could not parse DBS server output"
2775  self.logger.fatal(msg)
2776  raise Error(msg)
2777 
2778  # DEBUG DEBUG DEBUG
2779  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2780  # DEBUG DEBUG DEBUG end
2781 
2782  num_events = sum(handler.results["file.numevents"])
2783 
2784  # End of dbs_resolve_number_of_events.
2785  return num_events
2786 
2787  # OBSOLETE OBSOLETE OBSOLETE end
2788 
2789  ##########
2790 
2791 ## def dbs_resolve_dataset_number_of_sites(self, dataset_name):
2792 ## """Ask DBS across how many sites this dataset has been spread
2793 ## out.
2794 
2795 ## This is especially useful to check that we do not submit a job
2796 ## supposed to run on a complete sample that is not contained at
2797 ## a single site.
2798 
2799 ## """
2800 
2801 ## # DEBUG DEBUG DEBUG
2802 ## # If we get here DBS should have been set up already.
2803 ## assert not self.dbs_api is None
2804 ## # DEBUG DEBUG DEBUG end
2805 
2806 ## api = self.dbs_api
2807 ## dbs_query = "find count(site) where dataset = %s " \
2808 ## "and dataset.status = VALID" % \
2809 ## dataset_name
2810 ## try:
2811 ## api_result = api.executeQuery(dbs_query)
2812 ## except DbsApiException:
2813 ## raise Error("ERROR: Could not execute DBS query")
2814 
2815 ## try:
2816 ## num_sites = []
2817 ## class Handler(xml.sax.handler.ContentHandler):
2818 ## def startElement(self, name, attrs):
2819 ## if name == "result":
2820 ## num_sites.append(str(attrs["COUNT_STORAGEELEMENT"]))
2821 ## xml.sax.parseString(api_result, Handler())
2822 ## except SAXParseException:
2823 ## raise Error("ERROR: Could not parse DBS server output")
2824 
2825 ## # DEBUG DEBUG DEBUG
2826 ## assert len(num_sites) == 1
2827 ## # DEBUG DEBUG DEBUG end
2828 
2829 ## num_sites = int(num_sites[0])
2830 
2831 ## # End of dbs_resolve_dataset_number_of_sites.
2832 ## return num_sites
2833 
2834  ##########
2835 
2836 ## def dbs_check_dataset_spread(self, dataset_name):
2837 ## """Figure out across how many sites this dataset is spread.
2838 
2839 ## NOTE: This is something we need to figure out per run, since
2840 ## we want to submit harvesting jobs per run.
2841 
2842 ## Basically three things can happen with a given dataset:
2843 ## - the whole dataset is available on a single site,
2844 ## - the whole dataset is available (mirrored) at multiple sites,
2845 ## - the dataset is spread across multiple sites and there is no
2846 ## single site containing the full dataset in one place.
2847 
2848 ## NOTE: If all goes well, it should not be possible that
2849 ## anything but a _full_ dataset is mirrored. So we ignore the
2850 ## possibility in which for example one site contains the full
2851 ## dataset and two others mirror half of it.
2852 ## ANOTHER NOTE: According to some people this last case _could_
2853 ## actually happen. I will not design for it, but make sure it
2854 ## ends up as a false negative, in which case we just loose some
2855 ## efficiency and treat the dataset (unnecessarily) as
2856 ## spread-out.
2857 
2858 ## We don't really care about the first two possibilities, but in
2859 ## the third case we need to make sure to run the harvesting in
2860 ## two-step mode.
2861 
2862 ## This method checks with DBS which of the above cases is true
2863 ## for the dataset name given, and returns a 1 for the first two
2864 ## cases, and the number of sites across which the dataset is
2865 ## spread for the third case.
2866 
2867 ## The way in which this is done is by asking how many files each
2868 ## site has for the dataset. In the first case there is only one
2869 ## site, in the second case all sites should have the same number
2870 ## of files (i.e. the total number of files in the dataset) and
2871 ## in the third case the file counts from all sites should add up
2872 ## to the total file count for the dataset.
2873 
2874 ## """
2875 
2876 ## # DEBUG DEBUG DEBUG
2877 ## # If we get here DBS should have been set up already.
2878 ## assert not self.dbs_api is None
2879 ## # DEBUG DEBUG DEBUG end
2880 
2881 ## api = self.dbs_api
2882 ## dbs_query = "find run, run.numevents, site, file.count " \
2883 ## "where dataset = %s " \
2884 ## "and dataset.status = VALID" % \
2885 ## dataset_name
2886 ## try:
2887 ## api_result = api.executeQuery(dbs_query)
2888 ## except DbsApiException:
2889 ## msg = "ERROR: Could not execute DBS query"
2890 ## self.logger.fatal(msg)
2891 ## raise Error(msg)
2892 
2893 ## # Index things by run number. No cross-check is done to make
2894 ## # sure we get results for each and every run in the
2895 ## # dataset. I'm not sure this would make sense since we'd be
2896 ## # cross-checking DBS info with DBS info anyway. Note that we
2897 ## # use the file count per site to see if we're dealing with an
2898 ## # incomplete vs. a mirrored dataset.
2899 ## sample_info = {}
2900 ## try:
2901 ## class Handler(xml.sax.handler.ContentHandler):
2902 ## def startElement(self, name, attrs):
2903 ## if name == "result":
2904 ## run_number = int(attrs["RUNS_RUNNUMBER"])
2905 ## site_name = str(attrs["STORAGEELEMENT_SENAME"])
2906 ## file_count = int(attrs["COUNT_FILES"])
2907 ## # BUG BUG BUG
2908 ## # Doh! For some reason DBS never returns any other
2909 ## # event count than zero.
2910 ## event_count = int(attrs["RUNS_NUMBEROFEVENTS"])
2911 ## # BUG BUG BUG end
2912 ## info = (site_name, file_count, event_count)
2913 ## try:
2914 ## sample_info[run_number].append(info)
2915 ## except KeyError:
2916 ## sample_info[run_number] = [info]
2917 ## xml.sax.parseString(api_result, Handler())
2918 ## except SAXParseException:
2919 ## msg = "ERROR: Could not parse DBS server output"
2920 ## self.logger.fatal(msg)
2921 ## raise Error(msg)
2922 
2923 ## # Now translate this into a slightly more usable mapping.
2924 ## sites = {}
2925 ## for (run_number, site_info) in six.iteritems(sample_info):
2926 ## # Quick-n-dirty trick to see if all file counts are the
2927 ## # same.
2928 ## unique_file_counts = set([i[1] for i in site_info])
2929 ## if len(unique_file_counts) == 1:
2930 ## # Okay, so this must be a mirrored dataset.
2931 ## # We have to pick one but we have to be careful. We
2932 ## # cannot submit to things like a T0, a T1, or CAF.
2933 ## site_names = [self.pick_a_site([i[0] for i in site_info])]
2934 ## nevents = [site_info[0][2]]
2935 ## else:
2936 ## # Looks like this is a spread-out sample.
2937 ## site_names = [i[0] for i in site_info]
2938 ## nevents = [i[2] for i in site_info]
2939 ## sites[run_number] = zip(site_names, nevents)
2940 
2941 ## self.logger.debug("Sample `%s' spread is:" % dataset_name)
2942 ## run_numbers = sites.keys()
2943 ## run_numbers.sort()
2944 ## for run_number in run_numbers:
2945 ## self.logger.debug(" run # %6d: %d sites (%s)" % \
2946 ## (run_number,
2947 ## len(sites[run_number]),
2948 ## ", ".join([i[0] for i in sites[run_number]])))
2949 
2950 ## # End of dbs_check_dataset_spread.
2951 ## return sites
2952 
2953 ## # DEBUG DEBUG DEBUG
2954 ## # Just kept for debugging now.
2955 ## def dbs_check_dataset_spread_old(self, dataset_name):
2956 ## """Figure out across how many sites this dataset is spread.
2957 
2958 ## NOTE: This is something we need to figure out per run, since
2959 ## we want to submit harvesting jobs per run.
2960 
2961 ## Basically three things can happen with a given dataset:
2962 ## - the whole dataset is available on a single site,
2963 ## - the whole dataset is available (mirrored) at multiple sites,
2964 ## - the dataset is spread across multiple sites and there is no
2965 ## single site containing the full dataset in one place.
2966 
2967 ## NOTE: If all goes well, it should not be possible that
2968 ## anything but a _full_ dataset is mirrored. So we ignore the
2969 ## possibility in which for example one site contains the full
2970 ## dataset and two others mirror half of it.
2971 ## ANOTHER NOTE: According to some people this last case _could_
2972 ## actually happen. I will not design for it, but make sure it
2973 ## ends up as a false negative, in which case we just loose some
2974 ## efficiency and treat the dataset (unnecessarily) as
2975 ## spread-out.
2976 
2977 ## We don't really care about the first two possibilities, but in
2978 ## the third case we need to make sure to run the harvesting in
2979 ## two-step mode.
2980 
2981 ## This method checks with DBS which of the above cases is true
2982 ## for the dataset name given, and returns a 1 for the first two
2983 ## cases, and the number of sites across which the dataset is
2984 ## spread for the third case.
2985 
2986 ## The way in which this is done is by asking how many files each
2987 ## site has for the dataset. In the first case there is only one
2988 ## site, in the second case all sites should have the same number
2989 ## of files (i.e. the total number of files in the dataset) and
2990 ## in the third case the file counts from all sites should add up
2991 ## to the total file count for the dataset.
2992 
2993 ## """
2994 
2995 ## # DEBUG DEBUG DEBUG
2996 ## # If we get here DBS should have been set up already.
2997 ## assert not self.dbs_api is None
2998 ## # DEBUG DEBUG DEBUG end
2999 
3000 ## api = self.dbs_api
3001 ## dbs_query = "find run, run.numevents, site, file.count " \
3002 ## "where dataset = %s " \
3003 ## "and dataset.status = VALID" % \
3004 ## dataset_name
3005 ## try:
3006 ## api_result = api.executeQuery(dbs_query)
3007 ## except DbsApiException:
3008 ## msg = "ERROR: Could not execute DBS query"
3009 ## self.logger.fatal(msg)
3010 ## raise Error(msg)
3011 
3012 ## # Index things by run number. No cross-check is done to make
3013 ## # sure we get results for each and every run in the
3014 ## # dataset. I'm not sure this would make sense since we'd be
3015 ## # cross-checking DBS info with DBS info anyway. Note that we
3016 ## # use the file count per site to see if we're dealing with an
3017 ## # incomplete vs. a mirrored dataset.
3018 ## sample_info = {}
3019 ## try:
3020 ## class Handler(xml.sax.handler.ContentHandler):
3021 ## def startElement(self, name, attrs):
3022 ## if name == "result":
3023 ## run_number = int(attrs["RUNS_RUNNUMBER"])
3024 ## site_name = str(attrs["STORAGEELEMENT_SENAME"])
3025 ## file_count = int(attrs["COUNT_FILES"])
3026 ## # BUG BUG BUG
3027 ## # Doh! For some reason DBS never returns any other
3028 ## # event count than zero.
3029 ## event_count = int(attrs["RUNS_NUMBEROFEVENTS"])
3030 ## # BUG BUG BUG end
3031 ## info = (site_name, file_count, event_count)
3032 ## try:
3033 ## sample_info[run_number].append(info)
3034 ## except KeyError:
3035 ## sample_info[run_number] = [info]
3036 ## xml.sax.parseString(api_result, Handler())
3037 ## except SAXParseException:
3038 ## msg = "ERROR: Could not parse DBS server output"
3039 ## self.logger.fatal(msg)
3040 ## raise Error(msg)
3041 
3042 ## # Now translate this into a slightly more usable mapping.
3043 ## sites = {}
3044 ## for (run_number, site_info) in six.iteritems(sample_info):
3045 ## # Quick-n-dirty trick to see if all file counts are the
3046 ## # same.
3047 ## unique_file_counts = set([i[1] for i in site_info])
3048 ## if len(unique_file_counts) == 1:
3049 ## # Okay, so this must be a mirrored dataset.
3050 ## # We have to pick one but we have to be careful. We
3051 ## # cannot submit to things like a T0, a T1, or CAF.
3052 ## site_names = [self.pick_a_site([i[0] for i in site_info])]
3053 ## nevents = [site_info[0][2]]
3054 ## else:
3055 ## # Looks like this is a spread-out sample.
3056 ## site_names = [i[0] for i in site_info]
3057 ## nevents = [i[2] for i in site_info]
3058 ## sites[run_number] = zip(site_names, nevents)
3059 
3060 ## self.logger.debug("Sample `%s' spread is:" % dataset_name)
3061 ## run_numbers = sites.keys()
3062 ## run_numbers.sort()
3063 ## for run_number in run_numbers:
3064 ## self.logger.debug(" run # %6d: %d site(s) (%s)" % \
3065 ## (run_number,
3066 ## len(sites[run_number]),
3067 ## ", ".join([i[0] for i in sites[run_number]])))
3068 
3069 ## # End of dbs_check_dataset_spread_old.
3070 ## return sites
3071 ## # DEBUG DEBUG DEBUG end
3072 
3073  ##########
3074 
3075  def dbs_check_dataset_spread(self, dataset_name):
3076  """Figure out the number of events in each run of this dataset.
3077 
3078  This is a more efficient way of doing this than calling
3079  dbs_resolve_number_of_events for each run.
3080 
3081  """
3082 
3083  self.logger.debug("Checking spread of dataset `%s'" % dataset_name)
3084 
3085  # DEBUG DEBUG DEBUG
3086  # If we get here DBS should have been set up already.
3087  assert not self.dbs_api is None
3088  # DEBUG DEBUG DEBUG end
3089 
3090  api = self.dbs_api
3091  dbs_query = "find run.number, site, file.name, file.numevents " \
3092  "where dataset = %s " \
3093  "and dataset.status = VALID" % \
3094  dataset_name
3095  try:
3096  api_result = api.executeQuery(dbs_query)
3097  except DBSAPI.dbsApiException.DbsApiException:
3098  msg = "ERROR: Could not execute DBS query"
3099  self.logger.fatal(msg)
3100  raise Error(msg)
3101 
3102  handler = DBSXMLHandler(["run.number", "site", "file.name", "file.numevents"])
3103  parser = xml.sax.make_parser()
3104  parser.setContentHandler(handler)
3105 
3106  try:
3107  # OBSOLETE OBSOLETE OBSOLETE
3108 ## class Handler(xml.sax.handler.ContentHandler):
3109 ## def startElement(self, name, attrs):
3110 ## if name == "result":
3111 ## site_name = str(attrs["STORAGEELEMENT_SENAME"])
3112 ## # TODO TODO TODO
3113 ## # Ugly hack to get around cases like this:
3114 ## # $ dbs search --query="find dataset, site, file.count where dataset=/RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO"
3115 ## # Using DBS instance at: http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet
3116 ## # Processing ... \
3117 ## # PATH STORAGEELEMENT_SENAME COUNT_FILES
3118 ## # _________________________________________________________________________________
3119 ## # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO 1
3120 ## # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO cmssrm.fnal.gov 12
3121 ## # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO srm-cms.cern.ch 12
3122 ## if len(site_name) < 1:
3123 ## return
3124 ## # TODO TODO TODO end
3125 ## run_number = int(attrs["RUNS_RUNNUMBER"])
3126 ## file_name = str(attrs["FILES_LOGICALFILENAME"])
3127 ## nevents = int(attrs["FILES_NUMBEROFEVENTS"])
3128 ## # I know, this is a bit of a kludge.
3129 ## if not files_info.has_key(run_number):
3130 ## # New run.
3131 ## files_info[run_number] = {}
3132 ## files_info[run_number][file_name] = (nevents,
3133 ## [site_name])
3134 ## elif not files_info[run_number].has_key(file_name):
3135 ## # New file for a known run.
3136 ## files_info[run_number][file_name] = (nevents,
3137 ## [site_name])
3138 ## else:
3139 ## # New entry for a known file for a known run.
3140 ## # DEBUG DEBUG DEBUG
3141 ## # Each file should have the same number of
3142 ## # events independent of the site it's at.
3143 ## assert nevents == files_info[run_number][file_name][0]
3144 ## # DEBUG DEBUG DEBUG end
3145 ## files_info[run_number][file_name][1].append(site_name)
3146  # OBSOLETE OBSOLETE OBSOLETE end
3147  xml.sax.parseString(api_result, handler)
3148  except SAXParseException:
3149  msg = "ERROR: Could not parse DBS server output"
3150  self.logger.fatal(msg)
3151  raise Error(msg)
3152 
3153  # DEBUG DEBUG DEBUG
3154  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
3155  # DEBUG DEBUG DEBUG end
3156 
3157  # Now reshuffle all results a bit so we can more easily use
3158  # them later on. (Remember that all arrays in the results
3159  # should have equal length.)
3160  files_info = {}
3161  for (index, site_name) in enumerate(handler.results["site"]):
3162  # Ugly hack to get around cases like this:
3163  # $ dbs search --query="find dataset, site, file.count where dataset=/RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO"
3164  # Using DBS instance at: http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet
3165  # Processing ... \
3166  # PATH STORAGEELEMENT_SENAME COUNT_FILES
3167  # _________________________________________________________________________________
3168  # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO 1
3169  # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO cmssrm.fnal.gov 12
3170  # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO srm-cms.cern.ch 12
3171  if len(site_name) < 1:
3172  continue
3173  run_number = int(handler.results["run.number"][index])
3174  file_name = handler.results["file.name"][index]
3175  nevents = int(handler.results["file.numevents"][index])
3176 
3177  # I know, this is a bit of a kludge.
3178  if run_number not in files_info:
3179  # New run.
3180  files_info[run_number] = {}
3181  files_info[run_number][file_name] = (nevents,
3182  [site_name])
3183  elif file_name not in files_info[run_number]:
3184  # New file for a known run.
3185  files_info[run_number][file_name] = (nevents,
3186  [site_name])
3187  else:
3188  # New entry for a known file for a known run.
3189  # DEBUG DEBUG DEBUG
3190  # Each file should have the same number of
3191  # events independent of the site it's at.
3192  assert nevents == files_info[run_number][file_name][0]
3193  # DEBUG DEBUG DEBUG end
3194  files_info[run_number][file_name][1].append(site_name)
3195 
3196  # Remove any information for files that are not available
3197  # anywhere. NOTE: After introducing the ugly hack above, this
3198  # is a bit redundant, but let's keep it for the moment.
3199  for run_number in files_info.keys():
3200  files_without_sites = [i for (i, j) in \
3201  files_info[run_number].items() \
3202  if len(j[1]) < 1]
3203  if len(files_without_sites) > 0:
3204  self.logger.warning("Removing %d file(s)" \
3205  " with empty site names" % \
3206  len(files_without_sites))
3207  for file_name in files_without_sites:
3208  del files_info[run_number][file_name]
3209  # files_info[run_number][file_name] = (files_info \
3210  # [run_number] \
3211  # [file_name][0], [])
3212 
3213  # And another bit of a kludge.
3214  num_events_catalog = {}
3215  for run_number in files_info.keys():
3216  site_names = list(set([j for i in files_info[run_number].values() for j in i[1]]))
3217 
3218  # NOTE: The term `mirrored' does not have the usual
3219  # meaning here. It basically means that we can apply
3220  # single-step harvesting.
3221  mirrored = None
3222  if len(site_names) > 1:
3223  # Now we somehow need to figure out if we're dealing
3224  # with a mirrored or a spread-out dataset. The rule we
3225  # use here is that we're dealing with a spread-out
3226  # dataset unless we can find at least one site
3227  # containing exactly the full list of files for this
3228  # dataset that DBS knows about. In that case we just
3229  # use only that site.
3230  all_file_names = files_info[run_number].keys()
3231  all_file_names = set(all_file_names)
3232  sites_with_complete_copies = []
3233  for site_name in site_names:
3234  files_at_site = [i for (i, (j, k)) \
3235  in files_info[run_number].items() \
3236  if site_name in k]
3237  files_at_site = set(files_at_site)
3238  if files_at_site == all_file_names:
3239  sites_with_complete_copies.append(site_name)
3240  if len(sites_with_complete_copies) < 1:
3241  # This dataset/run is available at more than one
3242  # site, but no one has a complete copy. So this is
3243  # a spread-out sample.
3244  mirrored = False
3245  else:
3246  if len(sites_with_complete_copies) > 1:
3247  # This sample is available (and complete) at
3248  # more than one site. Definitely mirrored.
3249  mirrored = True
3250  else:
3251  # This dataset/run is available at more than
3252  # one site and at least one of them has a
3253  # complete copy. Even if this is only a single
3254  # site, let's call this `mirrored' and run the
3255  # single-step harvesting.
3256  mirrored = True
3257 
3258 ## site_names_ref = set(files_info[run_number].values()[0][1])
3259 ## for site_names_tmp in files_info[run_number].values()[1:]:
3260 ## if set(site_names_tmp[1]) != site_names_ref:
3261 ## mirrored = False
3262 ## break
3263 
3264  if mirrored:
3265  self.logger.debug(" -> run appears to be `mirrored'")
3266  else:
3267  self.logger.debug(" -> run appears to be spread-out")
3268 
3269  if mirrored and \
3270  len(sites_with_complete_copies) != len(site_names):
3271  # Remove any references to incomplete sites if we
3272  # have at least one complete site (and if there
3273  # are incomplete sites).
3274  for (file_name, (i, sites)) in files_info[run_number].items():
3275  complete_sites = [site for site in sites \
3276  if site in sites_with_complete_copies]
3277  files_info[run_number][file_name] = (i, complete_sites)
3278  site_names = sites_with_complete_copies
3279 
3280  self.logger.debug(" for run #%d:" % run_number)
3281  num_events_catalog[run_number] = {}
3282  num_events_catalog[run_number]["all_sites"] = sum([i[0] for i in files_info[run_number].values()])
3283  if len(site_names) < 1:
3284  self.logger.debug(" run is not available at any site")
3285  self.logger.debug(" (but should contain %d events" % \
3286  num_events_catalog[run_number]["all_sites"])
3287  else:
3288  self.logger.debug(" at all sites combined there are %d events" % \
3289  num_events_catalog[run_number]["all_sites"])
3290  for site_name in site_names:
3291  num_events_catalog[run_number][site_name] = sum([i[0] for i in files_info[run_number].values() if site_name in i[1]])
3292  self.logger.debug(" at site `%s' there are %d events" % \
3293  (site_name, num_events_catalog[run_number][site_name]))
3294  num_events_catalog[run_number]["mirrored"] = mirrored
3295 
3296  # End of dbs_check_dataset_spread.
3297  return num_events_catalog
3298 
3299  # Beginning of old version.
3300 ## def dbs_check_dataset_num_events(self, dataset_name):
3301 ## """Figure out the number of events in each run of this dataset.
3302 
3303 ## This is a more efficient way of doing this than calling
3304 ## dbs_resolve_number_of_events for each run.
3305 
3306 ## # BUG BUG BUG
3307 ## # This might very well not work at all for spread-out samples. (?)
3308 ## # BUG BUG BUG end
3309 
3310 ## """
3311 
3312 ## # DEBUG DEBUG DEBUG
3313 ## # If we get here DBS should have been set up already.
3314 ## assert not self.dbs_api is None
3315 ## # DEBUG DEBUG DEBUG end
3316 
3317 ## api = self.dbs_api
3318 ## dbs_query = "find run.number, file.name, file.numevents where dataset = %s " \
3319 ## "and dataset.status = VALID" % \
3320 ## dataset_name
3321 ## try:
3322 ## api_result = api.executeQuery(dbs_query)
3323 ## except DbsApiException:
3324 ## msg = "ERROR: Could not execute DBS query"
3325 ## self.logger.fatal(msg)
3326 ## raise Error(msg)
3327 
3328 ## try:
3329 ## files_info = {}
3330 ## class Handler(xml.sax.handler.ContentHandler):
3331 ## def startElement(self, name, attrs):
3332 ## if name == "result":
3333 ## run_number = int(attrs["RUNS_RUNNUMBER"])
3334 ## file_name = str(attrs["FILES_LOGICALFILENAME"])
3335 ## nevents = int(attrs["FILES_NUMBEROFEVENTS"])
3336 ## try:
3337 ## files_info[run_number][file_name] = nevents
3338 ## except KeyError:
3339 ## files_info[run_number] = {file_name: nevents}
3340 ## xml.sax.parseString(api_result, Handler())
3341 ## except SAXParseException:
3342 ## msg = "ERROR: Could not parse DBS server output"
3343 ## self.logger.fatal(msg)
3344 ## raise Error(msg)
3345 
3346 ## num_events_catalog = {}
3347 ## for run_number in files_info.keys():
3348 ## num_events_catalog[run_number] = sum(files_info[run_number].values())
3349 
3350 ## # End of dbs_check_dataset_num_events.
3351 ## return num_events_catalog
3352  # End of old version.
3353 
3354  ##########
3355 
3356  def build_dataset_list(self, input_method, input_name):
3357  """Build a list of all datasets to be processed.
3358 
3359  """
3360 
3361  dataset_names = []
3362 
3363  # It may be, but only for the list of datasets to ignore, that
3364  # the input method and name are None because nothing was
3365  # specified. In that case just an empty list is returned.
3366  if input_method is None:
3367  pass
3368  elif input_method == "dataset":
3369  # Input comes from a dataset name directly on the command
3370  # line. But, this can also contain wildcards so we need
3371  # DBS to translate it conclusively into a list of explicit
3372  # dataset names.
3373  self.logger.info("Asking DBS for dataset names")
3374  dataset_names = self.dbs_resolve_dataset_name(input_name)
3375  elif input_method == "datasetfile":
3376  # In this case a file containing a list of dataset names
3377  # is specified. Still, each line may contain wildcards so
3378  # this step also needs help from DBS.
3379  # NOTE: Lines starting with a `#' are ignored.
3380  self.logger.info("Reading input from list file `%s'" % \
3381  input_name)
3382  try:
3383  listfile = open("/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/bin/%s" %input_name, "r")
3384  print "open listfile"
3385  for dataset in listfile:
3386  # Skip empty lines.
3387  dataset_stripped = dataset.strip()
3388  if len(dataset_stripped) < 1:
3389  continue
3390  # Skip lines starting with a `#'.
3391  if dataset_stripped[0] != "#":
3392  dataset_names.extend(self. \
3393  dbs_resolve_dataset_name(dataset_stripped))
3394  listfile.close()
3395  except IOError:
3396  msg = "ERROR: Could not open input list file `%s'" % \
3397  input_name
3398  self.logger.fatal(msg)
3399  raise Error(msg)
3400  else:
3401  # DEBUG DEBUG DEBUG
3402  # We should never get here.
3403  assert False, "Unknown input method `%s'" % input_method
3404  # DEBUG DEBUG DEBUG end
3405 
3406  # Remove duplicates from the dataset list.
3407  # NOTE: There should not be any duplicates in any list coming
3408  # from DBS, but maybe the user provided a list file with less
3409  # care.
3410  # Store for later use.
3411  dataset_names = sorted(set(dataset_names))
3412 
3413 
3414  # End of build_dataset_list.
3415  return dataset_names
3416 
3417  ##########
3418 
3420  """Build a list of datasets to process.
3421 
3422  """
3423 
3424  self.logger.info("Building list of datasets to consider...")
3425 
3426  input_method = self.input_method["datasets"]["use"]
3427  input_name = self.input_name["datasets"]["use"]
3428  dataset_names = self.build_dataset_list(input_method,
3429  input_name)
3430  self.datasets_to_use = dict(list(zip(dataset_names,
3431  [None] * len(dataset_names))))
3432 
3433  self.logger.info(" found %d dataset(s) to process:" % \
3434  len(dataset_names))
3435  for dataset in dataset_names:
3436  self.logger.info(" `%s'" % dataset)
3437 
3438  # End of build_dataset_use_list.
3439 
3440  ##########
3441 
3443  """Build a list of datasets to ignore.
3444 
3445  NOTE: We should always have a list of datasets to process, but
3446  it may be that we don't have a list of datasets to ignore.
3447 
3448  """
3449 
3450  self.logger.info("Building list of datasets to ignore...")
3451 
3452  input_method = self.input_method["datasets"]["ignore"]
3453  input_name = self.input_name["datasets"]["ignore"]
3454  dataset_names = self.build_dataset_list(input_method,
3455  input_name)
3456  self.datasets_to_ignore = dict(list(zip(dataset_names,
3457  [None] * len(dataset_names))))
3458 
3459  self.logger.info(" found %d dataset(s) to ignore:" % \
3460  len(dataset_names))
3461  for dataset in dataset_names:
3462  self.logger.info(" `%s'" % dataset)
3463 
3464  # End of build_dataset_ignore_list.
3465 
3466  ##########
3467 
3468  def build_runs_list(self, input_method, input_name):
3469 
3470  runs = []
3471 
3472  # A list of runs (either to use or to ignore) is not
3473  # required. This protects against `empty cases.'
3474  if input_method is None:
3475  pass
3476  elif input_method == "runs":
3477  # A list of runs was specified directly from the command
3478  # line.
3479  self.logger.info("Reading list of runs from the " \
3480  "command line")
3481  runs.extend([int(i.strip()) \
3482  for i in input_name.split(",") \
3483  if len(i.strip()) > 0])
3484  elif input_method == "runslistfile":
3485  # We were passed a file containing a list of runs.
3486  self.logger.info("Reading list of runs from file `%s'" % \
3487  input_name)
3488  try:
3489  listfile = open(input_name, "r")
3490  for run in listfile:
3491  # Skip empty lines.
3492  run_stripped = run.strip()
3493  if len(run_stripped) < 1:
3494  continue
3495  # Skip lines starting with a `#'.
3496  if run_stripped[0] != "#":
3497  runs.append(int(run_stripped))
3498  listfile.close()
3499  except IOError:
3500  msg = "ERROR: Could not open input list file `%s'" % \
3501  input_name
3502  self.logger.fatal(msg)
3503  raise Error(msg)
3504 
3505  else:
3506  # DEBUG DEBUG DEBUG
3507  # We should never get here.
3508  assert False, "Unknown input method `%s'" % input_method
3509  # DEBUG DEBUG DEBUG end
3510 
3511  # Remove duplicates, sort and done.
3512  runs = list(set(runs))
3513 
3514  # End of build_runs_list().
3515  return runs
3516 
3517  ##########
3518 
3520  """Build a list of runs to process.
3521 
3522  """
3523 
3524  self.logger.info("Building list of runs to consider...")
3525 
3526  input_method = self.input_method["runs"]["use"]
3527  input_name = self.input_name["runs"]["use"]
3528  runs = self.build_runs_list(input_method, input_name)
3529  self.runs_to_use = dict(list(zip(runs, [None] * len(runs))))
3530 
3531  self.logger.info(" found %d run(s) to process:" % \
3532  len(runs))
3533  if len(runs) > 0:
3534  self.logger.info(" %s" % ", ".join([str(i) for i in runs]))
3535 
3536  # End of build_runs_list().
3537 
3538  ##########
3539 
3541  """Build a list of runs to ignore.
3542 
3543  NOTE: We should always have a list of runs to process, but
3544  it may be that we don't have a list of runs to ignore.
3545 
3546  """
3547 
3548  self.logger.info("Building list of runs to ignore...")
3549 
3550  input_method = self.input_method["runs"]["ignore"]
3551  input_name = self.input_name["runs"]["ignore"]
3552  runs = self.build_runs_list(input_method, input_name)
3553  self.runs_to_ignore = dict(list(zip(runs, [None] * len(runs))))
3554 
3555  self.logger.info(" found %d run(s) to ignore:" % \
3556  len(runs))
3557  if len(runs) > 0:
3558  self.logger.info(" %s" % ", ".join([str(i) for i in runs]))
3559 
3560  # End of build_runs_ignore_list().
3561 
3562  ##########
3563 
3565  """Update the list of datasets taking into account the ones to
3566  ignore.
3567 
3568  Both lists have been generated before from DBS and both are
3569  assumed to be unique.
3570 
3571  NOTE: The advantage of creating the ignore list from DBS (in
3572  case a regexp is given) and matching that instead of directly
3573  matching the ignore criterion against the list of datasets (to
3574  consider) built from DBS is that in the former case we're sure
3575  that all regexps are treated exactly as DBS would have done
3576  without the cmsHarvester.
3577 
3578  NOTE: This only removes complete samples. Exclusion of single
3579  runs is done by the book keeping. So the assumption is that a
3580  user never wants to harvest just part (i.e. n out of N runs)
3581  of a sample.
3582 
3583  """
3584 
3585  self.logger.info("Processing list of datasets to ignore...")
3586 
3587  self.logger.debug("Before processing ignore list there are %d " \
3588  "datasets in the list to be processed" % \
3589  len(self.datasets_to_use))
3590 
3591  # Simple approach: just loop and search.
3592  dataset_names_filtered = copy.deepcopy(self.datasets_to_use)
3593  for dataset_name in self.datasets_to_use.keys():
3594  if dataset_name in self.datasets_to_ignore.keys():
3595  del dataset_names_filtered[dataset_name]
3596 
3597  self.logger.info(" --> Removed %d dataset(s)" % \
3598  (len(self.datasets_to_use) -
3599  len(dataset_names_filtered)))
3600 
3601  self.datasets_to_use = dataset_names_filtered
3602 
3603  self.logger.debug("After processing ignore list there are %d " \
3604  "datasets in the list to be processed" % \
3605  len(self.datasets_to_use))
3606 
3607  # End of process_dataset_ignore_list.
3608 
3609  ##########
3610 
3612 
3613  self.logger.info("Processing list of runs to use and ignore...")
3614 
3615  # This basically adds all runs in a dataset to be processed,
3616  # except for any runs that are not specified in the `to use'
3617  # list and any runs that are specified in the `to ignore'
3618  # list.
3619 
3620  # NOTE: It is assumed that those lists make sense. The input
3621  # should be checked against e.g. overlapping `use' and
3622  # `ignore' lists.
3623 
3624  runs_to_use = self.runs_to_use
3625  runs_to_ignore = self.runs_to_ignore
3626 
3627  for dataset_name in self.datasets_to_use:
3628  runs_in_dataset = self.datasets_information[dataset_name]["runs"]
3629 
3630  # First some sanity checks.
3631  runs_to_use_tmp = []
3632  for run in runs_to_use:
3633  if not run in runs_in_dataset:
3634  self.logger.warning("Dataset `%s' does not contain " \
3635  "requested run %d " \
3636  "--> ignoring `use' of this run" % \
3637  (dataset_name, run))
3638  else:
3639  runs_to_use_tmp.append(run)
3640 
3641  if len(runs_to_use) > 0:
3642  runs = runs_to_use_tmp
3643  self.logger.info("Using %d out of %d runs " \
3644  "of dataset `%s'" % \
3645  (len(runs), len(runs_in_dataset),
3646  dataset_name))
3647  else:
3648  runs = runs_in_dataset
3649 
3650  if len(runs_to_ignore) > 0:
3651  runs_tmp = []
3652  for run in runs:
3653  if not run in runs_to_ignore:
3654  runs_tmp.append(run)
3655  self.logger.info("Ignoring %d out of %d runs " \
3656  "of dataset `%s'" % \
3657  (len(runs)- len(runs_tmp),
3658  len(runs_in_dataset),
3659  dataset_name))
3660  runs = runs_tmp
3661 
3662  if self.todofile != "YourToDofile.txt":
3663  runs_todo = []
3664  print "Reading runs from file /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s" %self.todofile
3665  cmd="grep %s /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s | cut -f5 -d' '" %(dataset_name,self.todofile)
3666  (status, output)=commands.getstatusoutput(cmd)
3667  for run in runs:
3668  run_str="%s" %run
3669  if run_str in output:
3670  runs_todo.append(run)
3671  self.logger.info("Using %d runs " \
3672  "of dataset `%s'" % \
3673  (len(runs_todo),
3674  dataset_name))
3675  runs=runs_todo
3676 
3677  Json_runs = []
3678  if self.Jsonfilename != "YourJSON.txt":
3679  good_runs = []
3680  self.Jsonlumi = True
3681  # We were passed a Jsonfile containing a dictionary of
3682  # run/lunisection-pairs
3683  self.logger.info("Reading runs and lumisections from file `%s'" % \
3684  self.Jsonfilename)
3685  try:
3686  Jsonfile = open(self.Jsonfilename, "r")
3687  for names in Jsonfile:
3688  dictNames= eval(str(names))
3689  for key in dictNames:
3690  intkey=int(key)
3691  Json_runs.append(intkey)
3692  Jsonfile.close()
3693  except IOError:
3694  msg = "ERROR: Could not open Jsonfile `%s'" % \
3695  input_name
3696  self.logger.fatal(msg)
3697  raise Error(msg)
3698  for run in runs:
3699  if run in Json_runs:
3700  good_runs.append(run)
3701  self.logger.info("Using %d runs " \
3702  "of dataset `%s'" % \
3703  (len(good_runs),
3704  dataset_name))
3705  runs=good_runs
3706  if (self.Jsonrunfilename != "YourJSON.txt") and (self.Jsonfilename == "YourJSON.txt"):
3707  good_runs = []
3708  # We were passed a Jsonfile containing a dictionary of
3709  # run/lunisection-pairs
3710  self.logger.info("Reading runs from file `%s'" % \
3711  self.Jsonrunfilename)
3712  try:
3713  Jsonfile = open(self.Jsonrunfilename, "r")
3714  for names in Jsonfile:
3715  dictNames= eval(str(names))
3716  for key in dictNames:
3717  intkey=int(key)
3718  Json_runs.append(intkey)
3719  Jsonfile.close()
3720  except IOError:
3721  msg = "ERROR: Could not open Jsonfile `%s'" % \
3722  input_name
3723  self.logger.fatal(msg)
3724  raise Error(msg)
3725  for run in runs:
3726  if run in Json_runs:
3727  good_runs.append(run)
3728  self.logger.info("Using %d runs " \
3729  "of dataset `%s'" % \
3730  (len(good_runs),
3731  dataset_name))
3732  runs=good_runs
3733 
3734  self.datasets_to_use[dataset_name] = runs
3735 
3736  # End of process_runs_use_and_ignore_lists().
3737 
3738  ##########
3739 
3741  """Remove all but the largest part of all datasets.
3742 
3743  This allows us to harvest at least part of these datasets
3744  using single-step harvesting until the two-step approach
3745  works.
3746 
3747  """
3748 
3749  # DEBUG DEBUG DEBUG
3750  assert self.harvesting_mode == "single-step-allow-partial"
3751  # DEBUG DEBUG DEBUG end
3752 
3753  for dataset_name in self.datasets_to_use:
3754  for run_number in self.datasets_information[dataset_name]["runs"]:
3755  max_events = max(self.datasets_information[dataset_name]["sites"][run_number].values())
3756  sites_with_max_events = [i[0] for i in self.datasets_information[dataset_name]["sites"][run_number].items() if i[1] == max_events]
3757  self.logger.warning("Singlifying dataset `%s', " \
3758  "run %d" % \
3759  (dataset_name, run_number))
3760  cmssw_version = self.datasets_information[dataset_name] \
3761  ["cmssw_version"]
3762  selected_site = self.pick_a_site(sites_with_max_events,
3763  cmssw_version)
3764 
3765  # Let's tell the user that we're manhandling this dataset.
3766  nevents_old = self.datasets_information[dataset_name]["num_events"][run_number]
3767  self.logger.warning(" --> " \
3768  "only harvesting partial statistics: " \
3769  "%d out of %d events (5.1%f%%) " \
3770  "at site `%s'" % \
3771  (max_events,
3772  nevents_old,
3773  100. * max_events / nevents_old,
3774  selected_site))
3775  self.logger.warning("!!! Please note that the number of " \
3776  "events in the output path name will " \
3777  "NOT reflect the actual statistics in " \
3778  "the harvested results !!!")
3779 
3780  # We found the site with the highest statistics and
3781  # the corresponding number of events. (CRAB gets upset
3782  # if we ask for more events than there are at a given
3783  # site.) Now update this information in our main
3784  # datasets_information variable.
3785  self.datasets_information[dataset_name]["sites"][run_number] = {selected_site: max_events}
3786  self.datasets_information[dataset_name]["num_events"][run_number] = max_events
3787  #self.datasets_information[dataset_name]["sites"][run_number] = [selected_site]
3788 
3789  # End of singlify_datasets.
3790 
3791  ##########
3792 
3794  """Check list of dataset names for impossible ones.
3795 
3796  Two kinds of checks are done:
3797  - Checks for things that do not make sense. These lead to
3798  errors and skipped datasets.
3799  - Sanity checks. For these warnings are issued but the user is
3800  considered to be the authoritative expert.
3801 
3802  Checks performed:
3803  - The CMSSW version encoded in the dataset name should match
3804  self.cmssw_version. This is critical.
3805  - There should be some events in the dataset/run. This is
3806  critical in the sense that CRAB refuses to create jobs for
3807  zero events. And yes, this does happen in practice. E.g. the
3808  reprocessed CRAFT08 datasets contain runs with zero events.
3809  - A cursory check is performed to see if the harvesting type
3810  makes sense for the data type. This should prevent the user
3811  from inadvertently running RelVal for data.
3812  - It is not possible to run single-step harvesting jobs on
3813  samples that are not fully contained at a single site.
3814  - Each dataset/run has to be available at at least one site.
3815 
3816  """
3817 
3818  self.logger.info("Performing sanity checks on dataset list...")
3819 
3820  dataset_names_after_checks = copy.deepcopy(self.datasets_to_use)
3821 
3822  for dataset_name in self.datasets_to_use.keys():
3823 
3824  # Check CMSSW version.
3825  version_from_dataset = self.datasets_information[dataset_name] \
3826  ["cmssw_version"]
3827  if version_from_dataset != self.cmssw_version:
3828  msg = " CMSSW version mismatch for dataset `%s' " \
3829  "(%s vs. %s)" % \
3830  (dataset_name,
3831  self.cmssw_version, version_from_dataset)
3832  if self.force_running:
3833  # Expert mode: just warn, then continue.
3834  self.logger.warning("%s " \
3835  "--> `force mode' active: " \
3836  "run anyway" % msg)
3837  else:
3838  del dataset_names_after_checks[dataset_name]
3839  self.logger.warning("%s " \
3840  "--> skipping" % msg)
3841  continue
3842 
3843  ###
3844 
3845  # Check that the harvesting type makes sense for the
3846  # sample. E.g. normally one would not run the DQMOffline
3847  # harvesting on Monte Carlo.
3848  # TODO TODO TODO
3849  # This should be further refined.
3850  suspicious = False
3851  datatype = self.datasets_information[dataset_name]["datatype"]
3852  if datatype == "data":
3853  # Normally only DQM harvesting is run on data.
3854  if self.harvesting_type != "DQMOffline":
3855  suspicious = True
3856  elif datatype == "mc":
3857  if self.harvesting_type == "DQMOffline":
3858  suspicious = True
3859  else:
3860  # Doh!
3861  assert False, "ERROR Impossible data type `%s' " \
3862  "for dataset `%s'" % \
3863  (datatype, dataset_name)
3864  if suspicious:
3865  msg = " Normally one does not run `%s' harvesting " \
3866  "on %s samples, are you sure?" % \
3867  (self.harvesting_type, datatype)
3868  if self.force_running:
3869  self.logger.warning("%s " \
3870  "--> `force mode' active: " \
3871  "run anyway" % msg)
3872  else:
3873  del dataset_names_after_checks[dataset_name]
3874  self.logger.warning("%s " \
3875  "--> skipping" % msg)
3876  continue
3877 
3878  # TODO TODO TODO end
3879 
3880  ###
3881 
3882  # BUG BUG BUG
3883  # For the moment, due to a problem with DBS, I cannot
3884  # figure out the GlobalTag for data by myself. (For MC
3885  # it's no problem.) This means that unless a GlobalTag was
3886  # specified from the command line, we will have to skip
3887  # any data datasets.
3888 
3889  if datatype == "data":
3890  if self.globaltag is None:
3891  msg = "For data datasets (like `%s') " \
3892  "we need a GlobalTag" % \
3893  dataset_name
3894  del dataset_names_after_checks[dataset_name]
3895  self.logger.warning("%s " \
3896  "--> skipping" % msg)
3897  continue
3898 
3899  # BUG BUG BUG end
3900 
3901  ###
3902 
3903  # Check if the GlobalTag exists and (if we're using
3904  # reference histograms) if it's ready to be used with
3905  # reference histograms.
3906  globaltag = self.datasets_information[dataset_name]["globaltag"]
3907  if not globaltag in self.globaltag_check_cache:
3908  if self.check_globaltag(globaltag):
3909  self.globaltag_check_cache.append(globaltag)
3910  else:
3911  msg = "Something is wrong with GlobalTag `%s' " \
3912  "used by dataset `%s'!" % \
3913  (globaltag, dataset_name)
3914  if self.use_ref_hists:
3915  msg += "\n(Either it does not exist or it " \
3916  "does not contain the required key to " \
3917  "be used with reference histograms.)"
3918  else:
3919  msg += "\n(It probably just does not exist.)"
3920  self.logger.fatal(msg)
3921  raise Usage(msg)
3922 
3923  ###
3924 
3925  # Require that each run is available at least somewhere.
3926  runs_without_sites = [i for (i, j) in \
3927  self.datasets_information[dataset_name] \
3928  ["sites"].items() \
3929  if len(j) < 1 and \
3930  i in self.datasets_to_use[dataset_name]]
3931  if len(runs_without_sites) > 0:
3932  for run_without_sites in runs_without_sites:
3933  try:
3934  dataset_names_after_checks[dataset_name].remove(run_without_sites)
3935  except KeyError:
3936  pass
3937  self.logger.warning(" removed %d unavailable run(s) " \
3938  "from dataset `%s'" % \
3939  (len(runs_without_sites), dataset_name))
3940  self.logger.debug(" (%s)" % \
3941  ", ".join([str(i) for i in \
3942  runs_without_sites]))
3943 
3944  ###
3945 
3946  # Unless we're running two-step harvesting: only allow
3947  # samples located on a single site.
3948  if not self.harvesting_mode == "two-step":
3949  for run_number in self.datasets_to_use[dataset_name]:
3950  # DEBUG DEBUG DEBUG
3951 ## if self.datasets_information[dataset_name]["num_events"][run_number] != 0:
3952 ## pdb.set_trace()
3953  # DEBUG DEBUG DEBUG end
3954  num_sites = len(self.datasets_information[dataset_name] \
3955  ["sites"][run_number])
3956  if num_sites > 1 and \
3957  not self.datasets_information[dataset_name] \
3958  ["mirrored"][run_number]:
3959  # Cannot do this with a single-step job, not
3960  # even in force mode. It just does not make
3961  # sense.
3962  msg = " Dataset `%s', run %d is spread across more " \
3963  "than one site.\n" \
3964  " Cannot run single-step harvesting on " \
3965  "samples spread across multiple sites" % \
3966  (dataset_name, run_number)
3967  try:
3968  dataset_names_after_checks[dataset_name].remove(run_number)
3969  except KeyError:
3970  pass
3971  self.logger.warning("%s " \
3972  "--> skipping" % msg)
3973 
3974  ###
3975 
3976  # Require that the dataset/run is non-empty.
3977  # NOTE: To avoid reconsidering empty runs/datasets next
3978  # time around, we do include them in the book keeping.
3979  # BUG BUG BUG
3980  # This should sum only over the runs that we use!
3981  tmp = [j for (i, j) in self.datasets_information \
3982  [dataset_name]["num_events"].items() \
3983  if i in self.datasets_to_use[dataset_name]]
3984  num_events_dataset = sum(tmp)
3985  # BUG BUG BUG end
3986  if num_events_dataset < 1:
3987  msg = " dataset `%s' is empty" % dataset_name
3988  del dataset_names_after_checks[dataset_name]
3989  self.logger.warning("%s " \
3990  "--> skipping" % msg)
3991  # Update the book keeping with all the runs in the dataset.
3992  # DEBUG DEBUG DEBUG
3993  #assert set([j for (i, j) in self.datasets_information \
3994  # [dataset_name]["num_events"].items() \
3995  # if i in self.datasets_to_use[dataset_name]]) == \
3996  # set([0])
3997  # DEBUG DEBUG DEBUG end
3998  #self.book_keeping_information[dataset_name] = self.datasets_information \
3999  # [dataset_name]["num_events"]
4000  continue
4001 
4002  tmp = [i for i in \
4003  self.datasets_information[dataset_name] \
4004  ["num_events"].items() if i[1] < 1]
4005  tmp = [i for i in tmp if i[0] in self.datasets_to_use[dataset_name]]
4006  empty_runs = dict(tmp)
4007  if len(empty_runs) > 0:
4008  for empty_run in empty_runs:
4009  try:
4010  dataset_names_after_checks[dataset_name].remove(empty_run)
4011  except KeyError:
4012  pass
4013  self.logger.info(" removed %d empty run(s) from dataset `%s'" % \
4014  (len(empty_runs), dataset_name))
4015  self.logger.debug(" (%s)" % \
4016  ", ".join([str(i) for i in empty_runs]))
4017 
4018  ###
4019 
4020  # If we emptied out a complete dataset, remove the whole
4021  # thing.
4022  dataset_names_after_checks_tmp = copy.deepcopy(dataset_names_after_checks)
4023  for (dataset_name, runs) in six.iteritems(dataset_names_after_checks):
4024  if len(runs) < 1:
4025  self.logger.warning(" Removing dataset without any runs " \
4026  "(left) `%s'" % \
4027  dataset_name)
4028  del dataset_names_after_checks_tmp[dataset_name]
4029  dataset_names_after_checks = dataset_names_after_checks_tmp
4030 
4031  ###
4032 
4033  self.logger.warning(" --> Removed %d dataset(s)" % \
4034  (len(self.datasets_to_use) -
4035  len(dataset_names_after_checks)))
4036 
4037  # Now store the modified version of the dataset list.
4038  self.datasets_to_use = dataset_names_after_checks
4039 
4040  # End of check_dataset_list.
4041 
4042  ##########
4043 
4044  def escape_dataset_name(self, dataset_name):
4045  """Escape a DBS dataset name.
4046 
4047  Escape a DBS dataset name such that it does not cause trouble
4048  with the file system. This means turning each `/' into `__',
4049  except for the first one which is just removed.
4050 
4051  """
4052 
4053  escaped_dataset_name = dataset_name
4054  escaped_dataset_name = escaped_dataset_name.strip("/")
4055  escaped_dataset_name = escaped_dataset_name.replace("/", "__")
4056 
4057  return escaped_dataset_name
4058 
4059  ##########
4060 
4061  # BUG BUG BUG
4062  # This is a bit of a redundant method, isn't it?
4063  def create_config_file_name(self, dataset_name, run_number):
4064  """Generate the name of the configuration file to be run by
4065  CRAB.
4066 
4067  Depending on the harvesting mode (single-step or two-step)
4068  this is the name of the real harvesting configuration or the
4069  name of the first-step ME summary extraction configuration.
4070 
4071  """
4072 
4073  if self.harvesting_mode == "single-step":
4074  config_file_name = self.create_harvesting_config_file_name(dataset_name)
4075  elif self.harvesting_mode == "single-step-allow-partial":
4076  config_file_name = self.create_harvesting_config_file_name(dataset_name)
4077 ## # Only add the alarming piece to the file name if this is
4078 ## # a spread-out dataset.
4079 ## pdb.set_trace()
4080 ## if self.datasets_information[dataset_name] \
4081 ## ["mirrored"][run_number] == False:
4082 ## config_file_name = config_file_name.replace(".py", "_partial.py")
4083  elif self.harvesting_mode == "two-step":
4084  config_file_name = self.create_me_summary_config_file_name(dataset_name)
4085  else:
4086  assert False, "ERROR Unknown harvesting mode `%s'" % \
4087  self.harvesting_mode
4088 
4089  # End of create_config_file_name.
4090  return config_file_name
4091  # BUG BUG BUG end
4092 
4093  ##########
4094 
4095  def create_harvesting_config_file_name(self, dataset_name):
4096  "Generate the name to be used for the harvesting config file."
4097 
4098  file_name_base = "harvesting.py"
4099  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4100  config_file_name = file_name_base.replace(".py",
4101  "_%s.py" % \
4102  dataset_name_escaped)
4103 
4104  # End of create_harvesting_config_file_name.
4105  return config_file_name
4106 
4107  ##########
4108 
4109  def create_me_summary_config_file_name(self, dataset_name):
4110  "Generate the name of the ME summary extraction config file."
4111 
4112  file_name_base = "me_extraction.py"
4113  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4114  config_file_name = file_name_base.replace(".py",
4115  "_%s.py" % \
4116  dataset_name_escaped)
4117 
4118  # End of create_me_summary_config_file_name.
4119  return config_file_name
4120 
4121  ##########
4122 
4123  def create_output_file_name(self, dataset_name, run_number=None):
4124  """Create the name of the output file name to be used.
4125 
4126  This is the name of the output file of the `first step'. In
4127  the case of single-step harvesting this is already the final
4128  harvesting output ROOT file. In the case of two-step
4129  harvesting it is the name of the intermediary ME summary
4130  file.
4131 
4132  """
4133 
4134  # BUG BUG BUG
4135  # This method has become a bit of a mess. Originally it was
4136  # nice to have one entry point for both single- and two-step
4137  # output file names. However, now the former needs the run
4138  # number, while the latter does not even know about run
4139  # numbers. This should be fixed up a bit.
4140  # BUG BUG BUG end
4141 
4142  if self.harvesting_mode == "single-step":
4143  # DEBUG DEBUG DEBUG
4144  assert not run_number is None
4145  # DEBUG DEBUG DEBUG end
4146  output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4147  elif self.harvesting_mode == "single-step-allow-partial":
4148  # DEBUG DEBUG DEBUG
4149  assert not run_number is None
4150  # DEBUG DEBUG DEBUG end
4151  output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4152  elif self.harvesting_mode == "two-step":
4153  # DEBUG DEBUG DEBUG
4154  assert run_number is None
4155  # DEBUG DEBUG DEBUG end
4156  output_file_name = self.create_me_summary_output_file_name(dataset_name)
4157  else:
4158  # This should not be possible, but hey...
4159  assert False, "ERROR Unknown harvesting mode `%s'" % \
4160  self.harvesting_mode
4161 
4162  # End of create_harvesting_output_file_name.
4163  return output_file_name
4164 
4165  ##########
4166 
4167  def create_harvesting_output_file_name(self, dataset_name, run_number):
4168  """Generate the name to be used for the harvesting output file.
4169 
4170  This harvesting output file is the _final_ ROOT output file
4171  containing the harvesting results. In case of two-step
4172  harvesting there is an intermediate ME output file as well.
4173 
4174  """
4175 
4176  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4177 
4178  # Hmmm, looking at the code for the DQMFileSaver this might
4179  # actually be the place where the first part of this file
4180  # naming scheme comes from.
4181  # NOTE: It looks like the `V0001' comes from the DQM
4182  # version. This is something that cannot be looked up from
4183  # here, so let's hope it does not change too often.
4184  output_file_name = "DQM_V0001_R%09d__%s.root" % \
4185  (run_number, dataset_name_escaped)
4186  if self.harvesting_mode.find("partial") > -1:
4187  # Only add the alarming piece to the file name if this is
4188  # a spread-out dataset.
4189  if self.datasets_information[dataset_name] \
4190  ["mirrored"][run_number] == False:
4191  output_file_name = output_file_name.replace(".root", \
4192  "_partial.root")
4193 
4194  # End of create_harvesting_output_file_name.
4195  return output_file_name
4196 
4197  ##########
4198 
4199  def create_me_summary_output_file_name(self, dataset_name):
4200  """Generate the name of the intermediate ME file name to be
4201  used in two-step harvesting.
4202 
4203  """
4204 
4205  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4206  output_file_name = "me_summary_%s.root" % \
4207  dataset_name_escaped
4208 
4209  # End of create_me_summary_output_file_name.
4210  return output_file_name
4211 
4212  ##########
4213 
4214  def create_multicrab_block_name(self, dataset_name, run_number, index):
4215  """Create the block name to use for this dataset/run number.
4216 
4217  This is what appears in the brackets `[]' in multicrab.cfg. It
4218  is used as the name of the job and to create output
4219  directories.
4220 
4221  """
4222 
4223  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4224  block_name = "%s_%09d_%s" % (dataset_name_escaped, run_number, index)
4225 
4226  # End of create_multicrab_block_name.
4227  return block_name
4228 
4229  ##########
4230 
4232  """Create a CRAB configuration for a given job.
4233 
4234  NOTE: This is _not_ a complete (as in: submittable) CRAB
4235  configuration. It is used to store the common settings for the
4236  multicrab configuration.
4237 
4238  NOTE: Only CERN CASTOR area (/castor/cern.ch/) is supported.
4239 
4240  NOTE: According to CRAB, you `Must define exactly two of
4241  total_number_of_events, events_per_job, or
4242  number_of_jobs.'. For single-step harvesting we force one job,
4243  for the rest we don't really care.
4244 
4245  # BUG BUG BUG
4246  # With the current version of CRAB (2.6.1), in which Daniele
4247  # fixed the behaviour of no_block_boundary for me, one _has to
4248  # specify_ the total_number_of_events and one single site in
4249  # the se_white_list.
4250  # BUG BUG BUG end
4251 
4252  """
4253 
4254  tmp = []
4255 
4256  # This is the stuff we will need to fill in.
4257  castor_prefix = self.castor_prefix
4258 
4259  tmp.append(self.config_file_header())
4260  tmp.append("")
4261 
4262  ## CRAB
4263  ##------
4264  tmp.append("[CRAB]")
4265  tmp.append("jobtype = cmssw")
4266  tmp.append("")
4267 
4268  ## GRID
4269  ##------
4270  tmp.append("[GRID]")
4271  tmp.append("virtual_organization=cms")
4272  tmp.append("")
4273 
4274  ## USER
4275  ##------
4276  tmp.append("[USER]")
4277  tmp.append("copy_data = 1")
4278  tmp.append("")
4279 
4280  ## CMSSW
4281  ##-------
4282  tmp.append("[CMSSW]")
4283  tmp.append("# This reveals data hosted on T1 sites,")
4284  tmp.append("# which is normally hidden by CRAB.")
4285  tmp.append("show_prod = 1")
4286  tmp.append("number_of_jobs = 1")
4287  if self.Jsonlumi == True:
4288  tmp.append("lumi_mask = %s" % self.Jsonfilename)
4289  tmp.append("total_number_of_lumis = -1")
4290  else:
4291  if self.harvesting_type == "DQMOffline":
4292  tmp.append("total_number_of_lumis = -1")
4293  else:
4294  tmp.append("total_number_of_events = -1")
4295  if self.harvesting_mode.find("single-step") > -1:
4296  tmp.append("# Force everything to run in one job.")
4297  tmp.append("no_block_boundary = 1")
4298  tmp.append("")
4299 
4300  ## CAF
4301  ##-----
4302  tmp.append("[CAF]")
4303 
4304  crab_config = "\n".join(tmp)
4305 
4306  # End of create_crab_config.
4307  return crab_config
4308 
4309  ##########
4310 
4312  """Create a multicrab.cfg file for all samples.
4313 
4314  This creates the contents for a multicrab.cfg file that uses
4315  the crab.cfg file (generated elsewhere) for the basic settings
4316  and contains blocks for each run of each dataset.
4317 
4318  # BUG BUG BUG
4319  # The fact that it's necessary to specify the se_white_list
4320  # and the total_number_of_events is due to our use of CRAB
4321  # version 2.6.1. This should no longer be necessary in the
4322  # future.
4323  # BUG BUG BUG end
4324 
4325  """
4326 
4327  cmd="who i am | cut -f1 -d' '"
4328  (status, output)=commands.getstatusoutput(cmd)
4329  UserName = output
4330 
4331  if self.caf_access == True:
4332  print "Extracting %s as user name" %UserName
4333 
4334  number_max_sites = self.nr_max_sites + 1
4335 
4336  multicrab_config_lines = []
4337  multicrab_config_lines.append(self.config_file_header())
4338  multicrab_config_lines.append("")
4339  multicrab_config_lines.append("[MULTICRAB]")
4340  multicrab_config_lines.append("cfg = crab.cfg")
4341  multicrab_config_lines.append("")
4342 
4343  dataset_names = sorted(self.datasets_to_use.keys())
4344 
4345  for dataset_name in dataset_names:
4346  runs = self.datasets_to_use[dataset_name]
4347  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4348  castor_prefix = self.castor_prefix
4349 
4350  for run in runs:
4351 
4352  # CASTOR output dir.
4353  castor_dir = self.datasets_information[dataset_name] \
4354  ["castor_path"][run]
4355 
4356  cmd = "rfdir %s" % castor_dir
4357  (status, output) = commands.getstatusoutput(cmd)
4358 
4359  if len(output) <= 0:
4360 
4361  # DEBUG DEBUG DEBUG
4362  # We should only get here if we're treating a
4363  # dataset/run that is fully contained at a single
4364  # site.
4365  assert (len(self.datasets_information[dataset_name] \
4366  ["sites"][run]) == 1) or \
4367  self.datasets_information[dataset_name]["mirrored"]
4368  # DEBUG DEBUG DEBUG end
4369 
4370  site_names = self.datasets_information[dataset_name] \
4371  ["sites"][run].keys()
4372 
4373  for i in range(1, number_max_sites, 1):
4374  if len(site_names) > 0:
4375  index = "site_%02d" % (i)
4376 
4377  config_file_name = self. \
4378  create_config_file_name(dataset_name, run)
4379  output_file_name = self. \
4380  create_output_file_name(dataset_name, run)
4381 
4382 
4383  # If we're looking at a mirrored dataset we just pick
4384  # one of the sites. Otherwise there is nothing to
4385  # choose.
4386 
4387  # Loop variable
4388  loop = 0
4389 
4390  if len(site_names) > 1:
4391  cmssw_version = self.datasets_information[dataset_name] \
4392  ["cmssw_version"]
4393  self.logger.info("Picking site for mirrored dataset " \
4394  "`%s', run %d" % \
4395  (dataset_name, run))
4396  site_name = self.pick_a_site(site_names, cmssw_version)
4397  if site_name in site_names:
4398  site_names.remove(site_name)
4399 
4400  else:
4401  site_name = site_names[0]
4402  site_names.remove(site_name)
4403 
4404  if site_name is self.no_matching_site_found_str:
4405  if loop < 1:
4406  break
4407 
4408  nevents = self.datasets_information[dataset_name]["num_events"][run]
4409 
4410  # The block name.
4411  multicrab_block_name = self.create_multicrab_block_name( \
4412  dataset_name, run, index)
4413  multicrab_config_lines.append("[%s]" % \
4414  multicrab_block_name)
4415 
4416  ## CRAB
4417  ##------
4418  if site_name == "caf.cern.ch":
4419  multicrab_config_lines.append("CRAB.use_server=0")
4420  multicrab_config_lines.append("CRAB.scheduler=caf")
4421  else:
4422  multicrab_config_lines.append("scheduler = glite")
4423 
4424  ## GRID
4425  ##------
4426  if site_name == "caf.cern.ch":
4427  pass
4428  else:
4429  multicrab_config_lines.append("GRID.se_white_list = %s" % \
4430  site_name)
4431  multicrab_config_lines.append("# This removes the default blacklisting of T1 sites.")
4432  multicrab_config_lines.append("GRID.remove_default_blacklist = 1")
4433  multicrab_config_lines.append("GRID.rb = CERN")
4434  if not self.non_t1access:
4435  multicrab_config_lines.append("GRID.role = t1access")
4436 
4437  ## USER
4438  ##------
4439 
4440  castor_dir = castor_dir.replace(castor_prefix, "")
4441  multicrab_config_lines.append("USER.storage_element=srm-cms.cern.ch")
4442  multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4443  castor_dir)
4444  multicrab_config_lines.append("USER.check_user_remote_dir=0")
4445 
4446  if site_name == "caf.cern.ch":
4447  multicrab_config_lines.append("USER.storage_path=%s" % castor_prefix)
4448  #multicrab_config_lines.append("USER.storage_element=T2_CH_CAF")
4449  #castor_dir = castor_dir.replace("/cms/store/caf/user/%s" %UserName, "")
4450  #multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4451  # castor_dir)
4452  else:
4453  multicrab_config_lines.append("USER.storage_path=/srm/managerv2?SFN=%s" % castor_prefix)
4454  #multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4455  # castor_dir)
4456  #multicrab_config_lines.append("USER.storage_element=srm-cms.cern.ch")
4457 
4458  ## CMSSW
4459  ##-------
4460  multicrab_config_lines.append("CMSSW.pset = %s" % \
4461  config_file_name)
4462  multicrab_config_lines.append("CMSSW.datasetpath = %s" % \
4463  dataset_name)
4464  multicrab_config_lines.append("CMSSW.runselection = %d" % \
4465  run)
4466 
4467  if self.Jsonlumi == True:
4468  pass
4469  else:
4470  if self.harvesting_type == "DQMOffline":
4471  pass
4472  else:
4473  multicrab_config_lines.append("CMSSW.total_number_of_events = %d" % \
4474  nevents)
4475  # The output file name.
4476  multicrab_config_lines.append("CMSSW.output_file = %s" % \
4477  output_file_name)
4478 
4479  ## CAF
4480  ##-----
4481  if site_name == "caf.cern.ch":
4482  multicrab_config_lines.append("CAF.queue=cmscaf1nd")
4483 
4484 
4485  # End of block.
4486  multicrab_config_lines.append("")
4487 
4488  loop = loop + 1
4489 
4490  self.all_sites_found = True
4491 
4492  multicrab_config = "\n".join(multicrab_config_lines)
4493 
4494  # End of create_multicrab_config.
4495  return multicrab_config
4496 
4497  ##########
4498 
4499  def check_globaltag(self, globaltag=None):
4500  """Check if globaltag exists.
4501 
4502  Check if globaltag exists as GlobalTag in the database given
4503  by self.frontier_connection_name['globaltag']. If globaltag is
4504  None, self.globaltag is used instead.
4505 
4506  If we're going to use reference histograms this method also
4507  checks for the existence of the required key in the GlobalTag.
4508 
4509  """
4510 
4511  if globaltag is None:
4512  globaltag = self.globaltag
4513 
4514  # All GlobalTags should end in `::All', right?
4515  if globaltag.endswith("::All"):
4516  globaltag = globaltag[:-5]
4517 
4518  connect_name = self.frontier_connection_name["globaltag"]
4519  # BUG BUG BUG
4520  # There is a bug in cmscond_tagtree_list: some magic is
4521  # missing from the implementation requiring one to specify
4522  # explicitly the name of the squid to connect to. Since the
4523  # cmsHarvester can only be run from the CERN network anyway,
4524  # cmsfrontier:8000 is hard-coded in here. Not nice but it
4525  # works.
4526  connect_name = connect_name.replace("frontier://",
4527  "frontier://cmsfrontier:8000/")
4528  # BUG BUG BUG end
4529  connect_name += self.db_account_name_cms_cond_globaltag()
4530 
4531  tag_exists = self.check_globaltag_exists(globaltag, connect_name)
4532 
4533  #----------
4534 
4535  tag_contains_ref_hist_key = False
4536  if self.use_ref_hists and tag_exists:
4537  # Check for the key required to use reference histograms.
4538  tag_contains_ref_hist_key = self.check_globaltag_contains_ref_hist_key(globaltag, connect_name)
4539 
4540  #----------
4541 
4542  if self.use_ref_hists:
4543  ret_val = tag_exists and tag_contains_ref_hist_key
4544  else:
4545  ret_val = tag_exists
4546 
4547  #----------
4548 
4549  # End of check_globaltag.
4550  return ret_val
4551 
4552  ##########
4553 
4554  def check_globaltag_exists(self, globaltag, connect_name):
4555  """Check if globaltag exists.
4556 
4557  """
4558 
4559  self.logger.info("Checking existence of GlobalTag `%s'" % \
4560  globaltag)
4561  self.logger.debug(" (Using database connection `%s')" % \
4562  connect_name)
4563 
4564  cmd = "cmscond_tagtree_list -c %s -T %s" % \
4565  (connect_name, globaltag)
4566  (status, output) = commands.getstatusoutput(cmd)
4567  if status != 0 or \
4568  output.find("error") > -1:
4569  msg = "Could not check existence of GlobalTag `%s' in `%s'" % \
4570  (globaltag, connect_name)
4571  if output.find(".ALL_TABLES not found") > -1:
4572  msg = "%s\n" \
4573  "Missing database account `%s'" % \
4574  (msg, output.split(".ALL_TABLES")[0].split()[-1])
4575  self.logger.fatal(msg)
4576  self.logger.debug("Command used:")
4577  self.logger.debug(" %s" % cmd)
4578  self.logger.debug("Output received:")
4579  self.logger.debug(output)
4580  raise Error(msg)
4581  if output.find("does not exist") > -1:
4582  self.logger.debug("GlobalTag `%s' does not exist in `%s':" % \
4583  (globaltag, connect_name))
4584  self.logger.debug("Output received:")
4585  self.logger.debug(output)
4586  tag_exists = False
4587  else:
4588  tag_exists = True
4589  self.logger.info(" GlobalTag exists? -> %s" % tag_exists)
4590 
4591  # End of check_globaltag_exists.
4592  return tag_exists
4593 
4594  ##########
4595 
4596  def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name):
4597  """Check if globaltag contains the required RefHistos key.
4598 
4599  """
4600 
4601  # Check for the key required to use reference histograms.
4602  tag_contains_key = None
4603  ref_hist_key = "RefHistos"
4604  self.logger.info("Checking existence of reference " \
4605  "histogram key `%s' in GlobalTag `%s'" % \
4606  (ref_hist_key, globaltag))
4607  self.logger.debug(" (Using database connection `%s')" % \
4608  connect_name)
4609  cmd = "cmscond_tagtree_list -c %s -T %s -n %s" % \
4610  (connect_name, globaltag, ref_hist_key)
4611  (status, output) = commands.getstatusoutput(cmd)
4612  if status != 0 or \
4613  output.find("error") > -1:
4614  msg = "Could not check existence of key `%s'" % \
4615  (ref_hist_key, connect_name)
4616  self.logger.fatal(msg)
4617  self.logger.debug("Command used:")
4618  self.logger.debug(" %s" % cmd)
4619  self.logger.debug("Output received:")
4620  self.logger.debug(" %s" % output)
4621  raise Error(msg)
4622  if len(output) < 1:
4623  self.logger.debug("Required key for use of reference " \
4624  "histograms `%s' does not exist " \
4625  "in GlobalTag `%s':" % \
4626  (ref_hist_key, globaltag))
4627  self.logger.debug("Output received:")
4628  self.logger.debug(output)
4629  tag_contains_key = False
4630  else:
4631  tag_contains_key = True
4632 
4633  self.logger.info(" GlobalTag contains `%s' key? -> %s" % \
4634  (ref_hist_key, tag_contains_key))
4635 
4636  # End of check_globaltag_contains_ref_hist_key.
4637  return tag_contains_key
4638 
4639  ##########
4640 
4641  def check_ref_hist_tag(self, tag_name):
4642  """Check the existence of tag_name in database connect_name.
4643 
4644  Check if tag_name exists as a reference histogram tag in the
4645  database given by self.frontier_connection_name['refhists'].
4646 
4647  """
4648 
4649  connect_name = self.frontier_connection_name["refhists"]
4650  connect_name += self.db_account_name_cms_cond_dqm_summary()
4651 
4652  self.logger.debug("Checking existence of reference " \
4653  "histogram tag `%s'" % \
4654  tag_name)
4655  self.logger.debug(" (Using database connection `%s')" % \
4656  connect_name)
4657 
4658  cmd = "cmscond_list_iov -c %s" % \
4659  connect_name
4660  (status, output) = commands.getstatusoutput(cmd)
4661  if status != 0:
4662  msg = "Could not check existence of tag `%s' in `%s'" % \
4663  (tag_name, connect_name)
4664  self.logger.fatal(msg)
4665  self.logger.debug("Command used:")
4666  self.logger.debug(" %s" % cmd)
4667  self.logger.debug("Output received:")
4668  self.logger.debug(output)
4669  raise Error(msg)
4670  if not tag_name in output.split():
4671  self.logger.debug("Reference histogram tag `%s' " \
4672  "does not exist in `%s'" % \
4673  (tag_name, connect_name))
4674  self.logger.debug(" Existing tags: `%s'" % \
4675  "', `".join(output.split()))
4676  tag_exists = False
4677  else:
4678  tag_exists = True
4679  self.logger.debug(" Reference histogram tag exists? " \
4680  "-> %s" % tag_exists)
4681 
4682  # End of check_ref_hist_tag.
4683  return tag_exists
4684 
4685  ##########
4686 
4687  def create_es_prefer_snippet(self, dataset_name):
4688  """Build the es_prefer snippet for the reference histograms.
4689 
4690  The building of the snippet is wrapped in some care-taking
4691  code that figures out the name of the reference histogram set
4692  and makes sure the corresponding tag exists.
4693 
4694  """
4695 
4696  # Figure out the name of the reference histograms tag.
4697  # NOTE: The existence of these tags has already been checked.
4698  ref_hist_tag_name = self.ref_hist_mappings[dataset_name]
4699 
4700  connect_name = self.frontier_connection_name["refhists"]
4701  connect_name += self.db_account_name_cms_cond_dqm_summary()
4702  record_name = "DQMReferenceHistogramRootFileRcd"
4703 
4704  # Build up the code snippet.
4705  code_lines = []
4706  code_lines.append("from CondCore.DBCommon.CondDBSetup_cfi import *")
4707  code_lines.append("process.ref_hist_source = cms.ESSource(\"PoolDBESSource\", CondDBSetup,")
4708  code_lines.append(" connect = cms.string(\"%s\")," % connect_name)
4709  code_lines.append(" toGet = cms.VPSet(cms.PSet(record = cms.string(\"%s\")," % record_name)
4710  code_lines.append(" tag = cms.string(\"%s\"))," % ref_hist_tag_name)
4711  code_lines.append(" )")
4712  code_lines.append(" )")
4713  code_lines.append("process.es_prefer_ref_hist_source = cms.ESPrefer(\"PoolDBESSource\", \"ref_hist_source\")")
4714 
4715  snippet = "\n".join(code_lines)
4716 
4717  # End of create_es_prefer_snippet.
4718  return snippet
4719 
4720  ##########
4721 
4722  def create_harvesting_config(self, dataset_name):
4723  """Create the Python harvesting configuration for harvesting.
4724 
4725  The basic configuration is created by
4726  Configuration.PyReleaseValidation.ConfigBuilder. (This mimics
4727  what cmsDriver.py does.) After that we add some specials
4728  ourselves.
4729 
4730  NOTE: On one hand it may not be nice to circumvent
4731  cmsDriver.py, on the other hand cmsDriver.py does not really
4732  do anything itself. All the real work is done by the
4733  ConfigBuilder so there is not much risk that we miss out on
4734  essential developments of cmsDriver in the future.
4735 
4736  """
4737 
4738  # Setup some options needed by the ConfigBuilder.
4739  config_options = defaultOptions
4740 
4741  # These are fixed for all kinds of harvesting jobs. Some of
4742  # them are not needed for the harvesting config, but to keep
4743  # the ConfigBuilder happy.
4744  config_options.name = "harvesting"
4745  config_options.scenario = "pp"
4746  config_options.number = 1
4747  config_options.arguments = self.ident_string()
4748  config_options.evt_type = config_options.name
4749  config_options.customisation_file = None
4750  config_options.filein = "dummy_value"
4751  config_options.filetype = "EDM"
4752  # This seems to be new in CMSSW 3.3.X, no clue what it does.
4753  config_options.gflash = "dummy_value"
4754  # This seems to be new in CMSSW 3.3.0.pre6, no clue what it
4755  # does.
4756  #config_options.himix = "dummy_value"
4757  config_options.dbsquery = ""
4758 
4759  ###
4760 
4761  # These options depend on the type of harvesting we're doing
4762  # and are stored in self.harvesting_info.
4763 
4764  config_options.step = "HARVESTING:%s" % \
4765  self.harvesting_info[self.harvesting_type] \
4766  ["step_string"]
4767  config_options.beamspot = self.harvesting_info[self.harvesting_type] \
4768  ["beamspot"]
4769  config_options.eventcontent = self.harvesting_info \
4770  [self.harvesting_type] \
4771  ["eventcontent"]
4772  config_options.harvesting = self.harvesting_info \
4773  [self.harvesting_type] \
4774  ["harvesting"]
4775 
4776  ###
4777 
4778  # This one is required (see also above) for each dataset.
4779 
4780  datatype = self.datasets_information[dataset_name]["datatype"]
4781  config_options.isMC = (datatype.lower() == "mc")
4782  config_options.isData = (datatype.lower() == "data")
4783  globaltag = self.datasets_information[dataset_name]["globaltag"]
4784 
4785  config_options.conditions = self.format_conditions_string(globaltag)
4786 
4787  ###
4788 
4789  if "with_input" in getargspec(ConfigBuilder.__init__)[0]:
4790  # This is the case for 3.3.X.
4791  config_builder = ConfigBuilder(config_options, with_input=True)
4792  else:
4793  # This is the case in older CMSSW versions.
4794  config_builder = ConfigBuilder(config_options)
4795  config_builder.prepare(True)
4796  config_contents = config_builder.pythonCfgCode
4797 
4798  ###
4799 
4800  # Add our signature to the top of the configuration. and add
4801  # some markers to the head and the tail of the Python code
4802  # generated by the ConfigBuilder.
4803  marker_lines = []
4804  sep = "#" * 30
4805  marker_lines.append(sep)
4806  marker_lines.append("# Code between these markers was generated by")
4807  marker_lines.append("# Configuration.PyReleaseValidation." \
4808  "ConfigBuilder")
4809 
4810  marker_lines.append(sep)
4811  marker = "\n".join(marker_lines)
4812 
4813  tmp = [self.config_file_header()]
4814  tmp.append("")
4815  tmp.append(marker)
4816  tmp.append("")
4817  tmp.append(config_contents)
4818  tmp.append("")
4819  tmp.append(marker)
4820  tmp.append("")
4821  config_contents = "\n".join(tmp)
4822 
4823  ###
4824 
4825  # Now we add some stuff of our own.
4826  customisations = [""]
4827 
4828  customisations.append("# Now follow some customisations")
4829  customisations.append("")
4830  connect_name = self.frontier_connection_name["globaltag"]
4831  connect_name += self.db_account_name_cms_cond_globaltag()
4832  customisations.append("process.GlobalTag.connect = \"%s\"" % \
4833  connect_name)
4834 
4835 
4836  if self.saveByLumiSection == True:
4837  customisations.append("process.dqmSaver.saveByLumiSection = 1")
4838 
4840 
4841  customisations.append("")
4842 
4843  # About the reference histograms... For data there is only one
4844  # set of references and those are picked up automatically
4845  # based on the GlobalTag. For MC we have to do some more work
4846  # since the reference histograms to be used depend on the MC
4847  # sample at hand. In this case we glue in an es_prefer snippet
4848  # to pick up the references. We do this only for RelVals since
4849  # for MC there are no meaningful references so far.
4850 
4851  # NOTE: Due to the lack of meaningful references for
4852  # MC samples reference histograms are explicitly
4853  # switched off in this case.
4854 
4855  use_es_prefer = (self.harvesting_type == "RelVal")
4856  use_refs = use_es_prefer or \
4857  (not self.harvesting_type == "MC")
4858  # Allow global override.
4859  use_refs = use_refs and self.use_ref_hists
4860 
4861  if not use_refs:
4862  # Disable reference histograms explicitly. The histograms
4863  # are loaded by the dqmRefHistoRootFileGetter
4864  # EDAnalyzer. This analyzer can be run from several
4865  # sequences. Here we remove it from each sequence that
4866  # exists.
4867  customisations.append("print \"Not using reference histograms\"")
4868  customisations.append("if hasattr(process, \"dqmRefHistoRootFileGetter\"):")
4869  customisations.append(" for (sequence_name, sequence) in six.iteritems(process.sequences):")
4870  customisations.append(" if sequence.remove(process.dqmRefHistoRootFileGetter):")
4871  customisations.append(" print \"Removed process.dqmRefHistoRootFileGetter from sequence `%s'\" % \\")
4872  customisations.append(" sequence_name")
4873  customisations.append("process.dqmSaver.referenceHandling = \"skip\"")
4874  else:
4875  # This makes sure all reference histograms are saved to
4876  # the output ROOT file.
4877  customisations.append("process.dqmSaver.referenceHandling = \"all\"")
4878  if use_es_prefer:
4879  es_prefer_snippet = self.create_es_prefer_snippet(dataset_name)
4880  customisations.append(es_prefer_snippet)
4881 
4882  # Make sure we get the `workflow' correct. As far as I can see
4883  # this is only important for the output file name.
4884  workflow_name = dataset_name
4885  if self.harvesting_mode == "single-step-allow-partial":
4886  workflow_name += "_partial"
4887  customisations.append("process.dqmSaver.workflow = \"%s\"" % \
4888  workflow_name)
4889 
4890  # BUG BUG BUG
4891  # This still does not work. The current two-step harvesting
4892  # efforts are on hold waiting for the solution to come from
4893  # elsewhere. (In this case the elsewhere is Daniele Spiga.)
4894 
4895 ## # In case this file is the second step (the real harvesting
4896 ## # step) of the two-step harvesting we have to tell it to use
4897 ## # our local files.
4898 ## if self.harvesting_mode == "two-step":
4899 ## castor_dir = self.datasets_information[dataset_name] \
4900 ## ["castor_path"][run]
4901 ## customisations.append("")
4902 ## customisations.append("# This is the second step (the real")
4903 ## customisations.append("# harvesting step) of a two-step")
4904 ## customisations.append("# harvesting procedure.")
4905 ## # BUG BUG BUG
4906 ## # To be removed in production version.
4907 ## customisations.append("import pdb")
4908 ## # BUG BUG BUG end
4909 ## customisations.append("import commands")
4910 ## customisations.append("import os")
4911 ## customisations.append("castor_dir = \"%s\"" % castor_dir)
4912 ## customisations.append("cmd = \"rfdir %s\" % castor_dir")
4913 ## customisations.append("(status, output) = commands.getstatusoutput(cmd)")
4914 ## customisations.append("if status != 0:")
4915 ## customisations.append(" print \"ERROR\"")
4916 ## customisations.append(" raise Exception, \"ERROR\"")
4917 ## customisations.append("file_names = [os.path.join(\"rfio:%s\" % path, i) for i in output.split() if i.startswith(\"EDM_summary\") and i.endswith(\".root\")]")
4918 ## #customisations.append("pdb.set_trace()")
4919 ## customisations.append("process.source.fileNames = cms.untracked.vstring(*file_names)")
4920 ## customisations.append("")
4921 
4922  # BUG BUG BUG end
4923 
4924  config_contents = config_contents + "\n".join(customisations)
4925 
4926  ###
4927 
4928  # End of create_harvesting_config.
4929  return config_contents
4930 
4931 ## ##########
4932 
4933 ## def create_harvesting_config_two_step(self, dataset_name):
4934 ## """Create the Python harvesting configuration for two-step
4935 ## harvesting.
4936 
4937 ## """
4938 
4939 ## # BUG BUG BUG
4940 ## config_contents = self.create_harvesting_config_single_step(dataset_name)
4941 ## # BUG BUG BUG end
4942 
4943 ## # End of create_harvesting_config_two_step.
4944 ## return config_contents
4945 
4946  ##########
4947 
4948  def create_me_extraction_config(self, dataset_name):
4949  """
4950 
4951  """
4952 
4953  # Big chunk of hard-coded Python. Not such a big deal since
4954  # this does not do much and is not likely to break.
4955  tmp = []
4956  tmp.append(self.config_file_header())
4957  tmp.append("")
4958  tmp.append("import FWCore.ParameterSet.Config as cms")
4959  tmp.append("")
4960  tmp.append("process = cms.Process(\"ME2EDM\")")
4961  tmp.append("")
4962  tmp.append("# Import of standard configurations")
4963  tmp.append("process.load(\"Configuration/EventContent/EventContent_cff\")")
4964  tmp.append("")
4965  tmp.append("# We don't really process any events, just keep this set to one to")
4966  tmp.append("# make sure things work.")
4967  tmp.append("process.maxEvents = cms.untracked.PSet(")
4968  tmp.append(" input = cms.untracked.int32(1)")
4969  tmp.append(" )")
4970  tmp.append("")
4971  tmp.append("process.options = cms.untracked.PSet(")
4972  tmp.append(" Rethrow = cms.untracked.vstring(\"ProductNotFound\")")
4973  tmp.append(" )")
4974  tmp.append("")
4975  tmp.append("process.source = cms.Source(\"PoolSource\",")
4976  tmp.append(" processingMode = \\")
4977  tmp.append(" cms.untracked.string(\"RunsAndLumis\"),")
4978  tmp.append(" fileNames = \\")
4979  tmp.append(" cms.untracked.vstring(\"no_file_specified\")")
4980  tmp.append(" )")
4981  tmp.append("")
4982  tmp.append("# Output definition: drop everything except for the monitoring.")
4983  tmp.append("process.output = cms.OutputModule(")
4984  tmp.append(" \"PoolOutputModule\",")
4985  tmp.append(" outputCommands = \\")
4986  tmp.append(" cms.untracked.vstring(\"drop *\", \\")
4987  tmp.append(" \"keep *_MEtoEDMConverter_*_*\"),")
4988  output_file_name = self. \
4989  create_output_file_name(dataset_name)
4990  tmp.append(" fileName = \\")
4991  tmp.append(" cms.untracked.string(\"%s\")," % output_file_name)
4992  tmp.append(" dataset = cms.untracked.PSet(")
4993  tmp.append(" dataTier = cms.untracked.string(\"RECO\"),")
4994  tmp.append(" filterName = cms.untracked.string(\"\")")
4995  tmp.append(" )")
4996  tmp.append(" )")
4997  tmp.append("")
4998  tmp.append("# Additional output definition")
4999  tmp.append("process.out_step = cms.EndPath(process.output)")
5000  tmp.append("")
5001  tmp.append("# Schedule definition")
5002  tmp.append("process.schedule = cms.Schedule(process.out_step)")
5003  tmp.append("")
5004 
5005  config_contents = "\n".join(tmp)
5006 
5007  # End of create_me_extraction_config.
5008  return config_contents
5009 
5010  ##########
5011 
5012 ## def create_harvesting_config(self, dataset_name):
5013 ## """Create the Python harvesting configuration for a given job.
5014 
5015 ## NOTE: The reason to have a single harvesting configuration per
5016 ## sample is to be able to specify the GlobalTag corresponding to
5017 ## each sample. Since it has been decided that (apart from the
5018 ## prompt reco) datasets cannot contain runs with different
5019 ## GlobalTags, we don't need a harvesting config per run.
5020 
5021 ## NOTE: This is the place where we distinguish between
5022 ## single-step and two-step harvesting modes (at least for the
5023 ## Python job configuration).
5024 
5025 ## """
5026 
5027 ## ###
5028 
5029 ## if self.harvesting_mode == "single-step":
5030 ## config_contents = self.create_harvesting_config_single_step(dataset_name)
5031 ## elif self.harvesting_mode == "two-step":
5032 ## config_contents = self.create_harvesting_config_two_step(dataset_name)
5033 ## else:
5034 ## # Impossible harvesting mode, we should never get here.
5035 ## assert False, "ERROR: unknown harvesting mode `%s'" % \
5036 ## self.harvesting_mode
5037 
5038 ## ###
5039 
5040 ## # End of create_harvesting_config.
5041 ## return config_contents
5042 
5043  ##########
5044 
5046  """Write a CRAB job configuration Python file.
5047 
5048  """
5049 
5050  self.logger.info("Writing CRAB configuration...")
5051 
5052  file_name_base = "crab.cfg"
5053 
5054  # Create CRAB configuration.
5055  crab_contents = self.create_crab_config()
5056 
5057  # Write configuration to file.
5058  crab_file_name = file_name_base
5059  try:
5060  crab_file = file(crab_file_name, "w")
5061  crab_file.write(crab_contents)
5062  crab_file.close()
5063  except IOError:
5064  self.logger.fatal("Could not write " \
5065  "CRAB configuration to file `%s'" % \
5066  crab_file_name)
5067  raise Error("ERROR: Could not write to file `%s'!" % \
5068  crab_file_name)
5069 
5070  # End of write_crab_config.
5071 
5072  ##########
5073 
5075  """Write a multi-CRAB job configuration Python file.
5076 
5077  """
5078 
5079  self.logger.info("Writing multi-CRAB configuration...")
5080 
5081  file_name_base = "multicrab.cfg"
5082 
5083  # Create multi-CRAB configuration.
5084  multicrab_contents = self.create_multicrab_config()
5085 
5086  # Write configuration to file.
5087  multicrab_file_name = file_name_base
5088  try:
5089  multicrab_file = file(multicrab_file_name, "w")
5090  multicrab_file.write(multicrab_contents)
5091  multicrab_file.close()
5092  except IOError:
5093  self.logger.fatal("Could not write " \
5094  "multi-CRAB configuration to file `%s'" % \
5095  multicrab_file_name)
5096  raise Error("ERROR: Could not write to file `%s'!" % \
5097  multicrab_file_name)
5098 
5099  # End of write_multicrab_config.
5100 
5101  ##########
5102 
5103  def write_harvesting_config(self, dataset_name):
5104  """Write a harvesting job configuration Python file.
5105 
5106  NOTE: This knows nothing about single-step or two-step
5107  harvesting. That's all taken care of by
5108  create_harvesting_config.
5109 
5110  """
5111 
5112  self.logger.debug("Writing harvesting configuration for `%s'..." % \
5113  dataset_name)
5114 
5115  # Create Python configuration.
5116  config_contents = self.create_harvesting_config(dataset_name)
5117 
5118  # Write configuration to file.
5119  config_file_name = self. \
5121  try:
5122  config_file = file(config_file_name, "w")
5123  config_file.write(config_contents)
5124  config_file.close()
5125  except IOError:
5126  self.logger.fatal("Could not write " \
5127  "harvesting configuration to file `%s'" % \
5128  config_file_name)
5129  raise Error("ERROR: Could not write to file `%s'!" % \
5130  config_file_name)
5131 
5132  # End of write_harvesting_config.
5133 
5134  ##########
5135 
5136  def write_me_extraction_config(self, dataset_name):
5137  """Write an ME-extraction configuration Python file.
5138 
5139  This `ME-extraction' (ME = Monitoring Element) is the first
5140  step of the two-step harvesting.
5141 
5142  """
5143 
5144  self.logger.debug("Writing ME-extraction configuration for `%s'..." % \
5145  dataset_name)
5146 
5147  # Create Python configuration.
5148  config_contents = self.create_me_extraction_config(dataset_name)
5149 
5150  # Write configuration to file.
5151  config_file_name = self. \
5153  try:
5154  config_file = file(config_file_name, "w")
5155  config_file.write(config_contents)
5156  config_file.close()
5157  except IOError:
5158  self.logger.fatal("Could not write " \
5159  "ME-extraction configuration to file `%s'" % \
5160  config_file_name)
5161  raise Error("ERROR: Could not write to file `%s'!" % \
5162  config_file_name)
5163 
5164  # End of write_me_extraction_config.
5165 
5166  ##########
5167 
5168 
5169  def ref_hist_mappings_needed(self, dataset_name=None):
5170  """Check if we need to load and check the reference mappings.
5171 
5172  For data the reference histograms should be taken
5173  automatically from the GlobalTag, so we don't need any
5174  mappings. For RelVals we need to know a mapping to be used in
5175  the es_prefer code snippet (different references for each of
5176  the datasets.)
5177 
5178  WARNING: This implementation is a bit convoluted.
5179 
5180  """
5181 
5182  # If no dataset name given, do everything, otherwise check
5183  # only this one dataset.
5184  if not dataset_name is None:
5185  data_type = self.datasets_information[dataset_name] \
5186  ["datatype"]
5187  mappings_needed = (data_type == "mc")
5188  # DEBUG DEBUG DEBUG
5189  if not mappings_needed:
5190  assert data_type == "data"
5191  # DEBUG DEBUG DEBUG end
5192  else:
5193  tmp = [self.ref_hist_mappings_needed(dataset_name) \
5194  for dataset_name in \
5195  self.datasets_information.keys()]
5196  mappings_needed = (True in tmp)
5197 
5198  # End of ref_hist_mappings_needed.
5199  return mappings_needed
5200 
5201  ##########
5202 
5204  """Load the reference histogram mappings from file.
5205 
5206  The dataset name to reference histogram name mappings are read
5207  from a text file specified in self.ref_hist_mappings_file_name.
5208 
5209  """
5210 
5211  # DEBUG DEBUG DEBUG
5212  assert len(self.ref_hist_mappings) < 1, \
5213  "ERROR Should not be RE-loading " \
5214  "reference histogram mappings!"
5215  # DEBUG DEBUG DEBUG end
5216 
5217  self.logger.info("Loading reference histogram mappings " \
5218  "from file `%s'" % \
5219  self.ref_hist_mappings_file_name)
5220 
5221  mappings_lines = None
5222  try:
5223  mappings_file = file(self.ref_hist_mappings_file_name, "r")
5224  mappings_lines = mappings_file.readlines()
5225  mappings_file.close()
5226  except IOError:
5227  msg = "ERROR: Could not open reference histogram mapping "\
5228  "file `%s'" % self.ref_hist_mappings_file_name
5229  self.logger.fatal(msg)
5230  raise Error(msg)
5231 
5232  ##########
5233 
5234  # The format we expect is: two white-space separated pieces
5235  # per line. The first the dataset name for which the reference
5236  # should be used, the second one the name of the reference
5237  # histogram in the database.
5238 
5239  for mapping in mappings_lines:
5240  # Skip comment lines.
5241  if not mapping.startswith("#"):
5242  mapping = mapping.strip()
5243  if len(mapping) > 0:
5244  mapping_pieces = mapping.split()
5245  if len(mapping_pieces) != 2:
5246  msg = "ERROR: The reference histogram mapping " \
5247  "file contains a line I don't " \
5248  "understand:\n %s" % mapping
5249  self.logger.fatal(msg)
5250  raise Error(msg)
5251  dataset_name = mapping_pieces[0].strip()
5252  ref_hist_name = mapping_pieces[1].strip()
5253  # We don't want people to accidentally specify
5254  # multiple mappings for the same dataset. Just
5255  # don't accept those cases.
5256  if dataset_name in self.ref_hist_mappings:
5257  msg = "ERROR: The reference histogram mapping " \
5258  "file contains multiple mappings for " \
5259  "dataset `%s'."
5260  self.logger.fatal(msg)
5261  raise Error(msg)
5262 
5263  # All is well that ends well.
5264  self.ref_hist_mappings[dataset_name] = ref_hist_name
5265 
5266  ##########
5267 
5268  self.logger.info(" Successfully loaded %d mapping(s)" % \
5269  len(self.ref_hist_mappings))
5270  max_len = max([len(i) for i in self.ref_hist_mappings.keys()])
5271  for (map_from, map_to) in six.iteritems(self.ref_hist_mappings):
5272  self.logger.info(" %-*s -> %s" % \
5273  (max_len, map_from, map_to))
5274 
5275  # End of load_ref_hist_mappings.
5276 
5277  ##########
5278 
5280  """Make sure all necessary reference histograms exist.
5281 
5282  Check that for each of the datasets to be processed a
5283  reference histogram is specified and that that histogram
5284  exists in the database.
5285 
5286  NOTE: There's a little complication here. Since this whole
5287  thing was designed to allow (in principle) harvesting of both
5288  data and MC datasets in one go, we need to be careful to check
5289  the availability fof reference mappings only for those
5290  datasets that need it.
5291 
5292  """
5293 
5294  self.logger.info("Checking reference histogram mappings")
5295 
5296  for dataset_name in self.datasets_to_use:
5297  try:
5298  ref_hist_name = self.ref_hist_mappings[dataset_name]
5299  except KeyError:
5300  msg = "ERROR: No reference histogram mapping found " \
5301  "for dataset `%s'" % \
5302  dataset_name
5303  self.logger.fatal(msg)
5304  raise Error(msg)
5305 
5306  if not self.check_ref_hist_tag(ref_hist_name):
5307  msg = "Reference histogram tag `%s' " \
5308  "(used for dataset `%s') does not exist!" % \
5309  (ref_hist_name, dataset_name)
5310  self.logger.fatal(msg)
5311  raise Usage(msg)
5312 
5313  self.logger.info(" Done checking reference histogram mappings.")
5314 
5315  # End of check_ref_hist_mappings.
5316 
5317  ##########
5318 
5320  """Obtain all information on the datasets that we need to run.
5321 
5322  Use DBS to figure out all required information on our
5323  datasets, like the run numbers and the GlobalTag. All
5324  information is stored in the datasets_information member
5325  variable.
5326 
5327  """
5328 
5329  # Get a list of runs in the dataset.
5330  # NOTE: The harvesting has to be done run-by-run, so we
5331  # split up datasets based on the run numbers. Strictly
5332  # speaking this is not (yet?) necessary for Monte Carlo
5333  # since all those samples use run number 1. Still, this
5334  # general approach should work for all samples.
5335 
5336  # Now loop over all datasets in the list and process them.
5337  # NOTE: This processing has been split into several loops
5338  # to be easier to follow, sacrificing a bit of efficiency.
5339  self.datasets_information = {}
5340  self.logger.info("Collecting information for all datasets to process")
5341  dataset_names = sorted(self.datasets_to_use.keys())
5342  for dataset_name in dataset_names:
5343 
5344  # Tell the user which dataset: nice with many datasets.
5345  sep_line = "-" * 30
5346  self.logger.info(sep_line)
5347  self.logger.info(" `%s'" % dataset_name)
5348  self.logger.info(sep_line)
5349 
5350  runs = self.dbs_resolve_runs(dataset_name)
5351  self.logger.info(" found %d run(s)" % len(runs))
5352  if len(runs) > 0:
5353  self.logger.debug(" run number(s): %s" % \
5354  ", ".join([str(i) for i in runs]))
5355  else:
5356  # DEBUG DEBUG DEBUG
5357  # This should never happen after the DBS checks.
5358  self.logger.warning(" --> skipping dataset "
5359  "without any runs")
5360  assert False, "Panic: found a dataset without runs " \
5361  "after DBS checks!"
5362  # DEBUG DEBUG DEBUG end
5363 
5364  cmssw_version = self.dbs_resolve_cmssw_version(dataset_name)
5365  self.logger.info(" found CMSSW version `%s'" % cmssw_version)
5366 
5367  # Figure out if this is data or MC.
5368  datatype = self.dbs_resolve_datatype(dataset_name)
5369  self.logger.info(" sample is data or MC? --> %s" % \
5370  datatype)
5371 
5372  ###
5373 
5374  # Try and figure out the GlobalTag to be used.
5375  if self.globaltag is None:
5376  globaltag = self.dbs_resolve_globaltag(dataset_name)
5377  else:
5378  globaltag = self.globaltag
5379 
5380  self.logger.info(" found GlobalTag `%s'" % globaltag)
5381 
5382  # DEBUG DEBUG DEBUG
5383  if globaltag == "":
5384  # Actually we should not even reach this point, after
5385  # our dataset sanity checks.
5386  assert datatype == "data", \
5387  "ERROR Empty GlobalTag for MC dataset!!!"
5388  # DEBUG DEBUG DEBUG end
5389 
5390  ###
5391 
5392  # DEBUG DEBUG DEBUG
5393  #tmp = self.dbs_check_dataset_spread_old(dataset_name)
5394  # DEBUG DEBUG DEBUG end
5395  sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5396 
5397  # Extract the total event counts.
5398  num_events = {}
5399  for run_number in sites_catalog.keys():
5400  num_events[run_number] = sites_catalog \
5401  [run_number]["all_sites"]
5402  del sites_catalog[run_number]["all_sites"]
5403 
5404  # Extract the information about whether or not datasets
5405  # are mirrored.
5406  mirror_catalog = {}
5407  for run_number in sites_catalog.keys():
5408  mirror_catalog[run_number] = sites_catalog \
5409  [run_number]["mirrored"]
5410  del sites_catalog[run_number]["mirrored"]
5411 
5412  # BUG BUG BUG
5413  # I think I could now get rid of that and just fill the
5414  # "sites" entry with the `inverse' of this
5415  # num_events_catalog(?).
5416  #num_sites = self.dbs_resolve_dataset_number_of_sites(dataset_name)
5417  #sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5418  #sites_catalog = dict(zip(num_events_catalog.keys(),
5419  # [[j for i in num_events_catalog.values() for j in i.keys()]]))
5420  # BUG BUG BUG end
5421 
5422 ## # DEBUG DEBUG DEBUG
5423 ## # This is probably only useful to make sure we don't muck
5424 ## # things up, right?
5425 ## # Figure out across how many sites this sample has been spread.
5426 ## if num_sites == 1:
5427 ## self.logger.info(" sample is contained at a single site")
5428 ## else:
5429 ## self.logger.info(" sample is spread across %d sites" % \
5430 ## num_sites)
5431 ## if num_sites < 1:
5432 ## # NOTE: This _should not_ happen with any valid dataset.
5433 ## self.logger.warning(" --> skipping dataset which is not " \
5434 ## "hosted anywhere")
5435 ## # DEBUG DEBUG DEBUG end
5436 
5437  # Now put everything in a place where we can find it again
5438  # if we need it.
5439  self.datasets_information[dataset_name] = {}
5440  self.datasets_information[dataset_name]["runs"] = runs
5441  self.datasets_information[dataset_name]["cmssw_version"] = \
5442  cmssw_version
5443  self.datasets_information[dataset_name]["globaltag"] = globaltag
5444  self.datasets_information[dataset_name]["datatype"] = datatype
5445  self.datasets_information[dataset_name]["num_events"] = num_events
5446  self.datasets_information[dataset_name]["mirrored"] = mirror_catalog
5447  self.datasets_information[dataset_name]["sites"] = sites_catalog
5448 
5449  # Each run of each dataset has a different CASTOR output
5450  # path.
5451  castor_path_common = self.create_castor_path_name_common(dataset_name)
5452  self.logger.info(" output will go into `%s'" % \
5453  castor_path_common)
5454 
5455  castor_paths = dict(list(zip(runs,
5456  [self.create_castor_path_name_special(dataset_name, i, castor_path_common) \
5457  for i in runs])))
5458  for path_name in castor_paths.values():
5459  self.logger.debug(" %s" % path_name)
5460  self.datasets_information[dataset_name]["castor_path"] = \
5461  castor_paths
5462 
5463  # End of build_datasets_information.
5464 
5465  ##########
5466 
5468  """Tell the user what to do now, after this part is done.
5469 
5470  This should provide the user with some (preferably
5471  copy-pasteable) instructions on what to do now with the setups
5472  and files that have been created.
5473 
5474  """
5475 
5476  # TODO TODO TODO
5477  # This could be improved a bit.
5478  # TODO TODO TODO end
5479 
5480  sep_line = "-" * 60
5481 
5482  self.logger.info("")
5483  self.logger.info(sep_line)
5484  self.logger.info(" Configuration files have been created.")
5485  self.logger.info(" From here on please follow the usual CRAB instructions.")
5486  self.logger.info(" Quick copy-paste instructions are shown below.")
5487  self.logger.info(sep_line)
5488 
5489  self.logger.info("")
5490  self.logger.info(" Create all CRAB jobs:")
5491  self.logger.info(" multicrab -create")
5492  self.logger.info("")
5493  self.logger.info(" Submit all CRAB jobs:")
5494  self.logger.info(" multicrab -submit")
5495  self.logger.info("")
5496  self.logger.info(" Check CRAB status:")
5497  self.logger.info(" multicrab -status")
5498  self.logger.info("")
5499 
5500  self.logger.info("")
5501  self.logger.info(" For more information please see the CMS Twiki:")
5502  self.logger.info(" %s" % twiki_url)
5503  self.logger.info(sep_line)
5504 
5505  # If there were any jobs for which we could not find a
5506  # matching site show a warning message about that.
5507  if not self.all_sites_found:
5508  self.logger.warning(" For some of the jobs no matching " \
5509  "site could be found")
5510  self.logger.warning(" --> please scan your multicrab.cfg" \
5511  "for occurrences of `%s'." % \
5512  self.no_matching_site_found_str)
5513  self.logger.warning(" You will have to fix those " \
5514  "by hand, sorry.")
5515 
5516  # End of show_exit_message.
5517 
5518  ##########
5519 
5520  def run(self):
5521  "Main entry point of the CMS harvester."
5522 
5523  # Start with a positive thought.
5524  exit_code = 0
5525 
5526  try:
5527 
5528  try:
5529 
5530  # Parse all command line options and arguments
5531  self.parse_cmd_line_options()
5532  # and check that they make sense.
5533  self.check_input_status()
5534 
5535  # Check if CMSSW is setup.
5536  self.check_cmssw()
5537 
5538  # Check if DBS is setup,
5539  self.check_dbs()
5540  # and if all is fine setup the Python side.
5541  self.setup_dbs()
5542 
5543  # Fill our dictionary with all the required info we
5544  # need to understand harvesting jobs. This needs to be
5545  # done after the CMSSW version is known.
5546  self.setup_harvesting_info()
5547 
5548  # Obtain list of dataset names to consider
5549  self.build_dataset_use_list()
5550  # and the list of dataset names to ignore.
5551  self.build_dataset_ignore_list()
5552 
5553  # The same for the runs lists (if specified).
5554  self.build_runs_use_list()
5555  self.build_runs_ignore_list()
5556 
5557  # Process the list of datasets to ignore and fold that
5558  # into the list of datasets to consider.
5559  # NOTE: The run-based selection is done later since
5560  # right now we don't know yet which runs a dataset
5561  # contains.
5562  self.process_dataset_ignore_list()
5563 
5564  # Obtain all required information on the datasets,
5565  # like run numbers and GlobalTags.
5566  self.build_datasets_information()
5567 
5568  if self.use_ref_hists and \
5569  self.ref_hist_mappings_needed():
5570  # Load the dataset name to reference histogram
5571  # name mappings from file.
5572  self.load_ref_hist_mappings()
5573  # Now make sure that for all datasets we want to
5574  # process there is a reference defined. Otherwise
5575  # just bomb out before wasting any more time.
5576  self.check_ref_hist_mappings()
5577  else:
5578  self.logger.info("No need to load reference " \
5579  "histogram mappings file")
5580 
5581  # OBSOLETE OBSOLETE OBSOLETE
5582 ## # TODO TODO TODO
5583 ## # Need to think about where this should go, but
5584 ## # somewhere we have to move over the fact that we want
5585 ## # to process all runs for each dataset that we're
5586 ## # considering. This basically means copying over the
5587 ## # information from self.datasets_information[]["runs"]
5588 ## # to self.datasets_to_use[].
5589 ## for dataset_name in self.datasets_to_use.keys():
5590 ## self.datasets_to_use[dataset_name] = self.datasets_information[dataset_name]["runs"]
5591 ## # TODO TODO TODO end
5592  # OBSOLETE OBSOLETE OBSOLETE end
5593 
5594  self.process_runs_use_and_ignore_lists()
5595 
5596  # If we've been asked to sacrifice some parts of
5597  # spread-out samples in order to be able to partially
5598  # harvest them, we'll do that here.
5599  if self.harvesting_mode == "single-step-allow-partial":
5600  self.singlify_datasets()
5601 
5602  # Check dataset name(s)
5603  self.check_dataset_list()
5604  # and see if there is anything left to do.
5605  if len(self.datasets_to_use) < 1:
5606  self.logger.info("After all checks etc. " \
5607  "there are no datasets (left?) " \
5608  "to process")
5609  else:
5610 
5611  self.logger.info("After all checks etc. we are left " \
5612  "with %d dataset(s) to process " \
5613  "for a total of %d runs" % \
5614  (len(self.datasets_to_use),
5615  sum([len(i) for i in \
5616  self.datasets_to_use.values()])))
5617 
5618  # NOTE: The order in which things are done here is
5619  # important. At the end of the job, independent on
5620  # how it ends (exception, CTRL-C, normal end) the
5621  # book keeping is written to file. At that time it
5622  # should be clear which jobs are done and can be
5623  # submitted. This means we first create the
5624  # general files, and then the per-job config
5625  # files.
5626 
5627  # TODO TODO TODO
5628  # It would be good to modify the book keeping a
5629  # bit. Now we write the crab.cfg (which is the
5630  # same for all samples and runs) and the
5631  # multicrab.cfg (which contains blocks for all
5632  # runs of all samples) without updating our book
5633  # keeping. The only place we update the book
5634  # keeping is after writing the harvesting config
5635  # file for a given dataset. Since there is only
5636  # one single harvesting configuration for each
5637  # dataset, we have no book keeping information on
5638  # a per-run basis.
5639  # TODO TODO TODO end
5640 
5641  # Check if the CASTOR output area exists. If
5642  # necessary create it.
5643  self.create_and_check_castor_dirs()
5644 
5645  # Create one crab and one multicrab configuration
5646  # for all jobs together.
5647  self.write_crab_config()
5648  self.write_multicrab_config()
5649 
5650  # Loop over all datasets and create harvesting
5651  # config files for all of them. One harvesting
5652  # config per dataset is enough. The same file will
5653  # be re-used by CRAB for each run.
5654  # NOTE: We always need a harvesting
5655  # configuration. For the two-step harvesting we
5656  # also need a configuration file for the first
5657  # step: the monitoring element extraction.
5658  for dataset_name in self.datasets_to_use.keys():
5659  try:
5660  self.write_harvesting_config(dataset_name)
5661  if self.harvesting_mode == "two-step":
5662  self.write_me_extraction_config(dataset_name)
5663  except:
5664  # Doh! Just re-raise the damn thing.
5665  raise
5666  else:
5667 ## tmp = self.datasets_information[dataset_name] \
5668 ## ["num_events"]
5669  tmp = {}
5670  for run_number in self.datasets_to_use[dataset_name]:
5671  tmp[run_number] = self.datasets_information \
5672  [dataset_name]["num_events"][run_number]
5673  if dataset_name in self.book_keeping_information:
5674  self.book_keeping_information[dataset_name].update(tmp)
5675  else:
5676  self.book_keeping_information[dataset_name] = tmp
5677 
5678  # Explain the user what to do now.
5679  self.show_exit_message()
5680 
5681  except Usage as err:
5682  # self.logger.fatal(err.msg)
5683  # self.option_parser.print_help()
5684  pass
5685 
5686  except Error as err:
5687  # self.logger.fatal(err.msg)
5688  exit_code = 1
5689 
5690  except Exception as err:
5691  # Hmmm, ignore keyboard interrupts from the
5692  # user. These are not a `serious problem'. We also
5693  # skip SystemExit, which is the exception thrown when
5694  # one calls sys.exit(). This, for example, is done by
5695  # the option parser after calling print_help(). We
5696  # also have to catch all `no such option'
5697  # complaints. Everything else we catch here is a
5698  # `serious problem'.
5699  if isinstance(err, SystemExit):
5700  self.logger.fatal(err.code)
5701  elif not isinstance(err, KeyboardInterrupt):
5702  self.logger.fatal("!" * 50)
5703  self.logger.fatal(" This looks like a serious problem.")
5704  self.logger.fatal(" If you are sure you followed all " \
5705  "instructions")
5706  self.logger.fatal(" please copy the below stack trace together")
5707  self.logger.fatal(" with a description of what you were doing to")
5708  self.logger.fatal(" jeroen.hegeman@cern.ch.")
5709  self.logger.fatal(" %s" % self.ident_string())
5710  self.logger.fatal("!" * 50)
5711  self.logger.fatal(str(err))
5712  import traceback
5713  traceback_string = traceback.format_exc()
5714  for line in traceback_string.split("\n"):
5715  self.logger.fatal(line)
5716  self.logger.fatal("!" * 50)
5717  exit_code = 2
5718 
5719  # This is the stuff that we should really do, no matter
5720  # what. Of course cleaning up after ourselves is also done
5721  # from this place. This alsokeeps track of the book keeping
5722  # so far. (This means that if half of the configuration files
5723  # were created before e.g. the disk was full, we should still
5724  # have a consistent book keeping file.
5725  finally:
5726 
5727  self.cleanup()
5728 
5729  ###
5730 
5731  if self.crab_submission == True:
5732  os.system("multicrab -create")
5733  os.system("multicrab -submit")
5734 
5735  # End of run.
5736  return exit_code
5737 
5738  # End of CMSHarvester.
5739 
5740 ###########################################################################
5741 ## Main entry point.
5742 ###########################################################################
5743 
5744 if __name__ == "__main__":
5745  "Main entry point for harvesting."
5746 
5747  CMSHarvester().run()
5748 
5749  # Done.
5750 
5751 ###########################################################################
def create_crab_config(self)
def option_handler_harvesting_mode(self, option, opt_str, value, parser)
def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser)
def check_cmssw(self)
def write_harvesting_config(self, dataset_name)
def option_handler_saveByLumiSection(self, option, opt_str, value, parser)
def option_handler_sites(self, option, opt_str, value, parser)
def write_me_extraction_config(self, dataset_name)
def __init__(self, msg)
def process_runs_use_and_ignore_lists(self)
def build_dataset_ignore_list(self)
def escape_dataset_name(self, dataset_name)
if self.datasets_information[dataset_name]["num_events"][run_number] != 0: pdb.set_trace() DEBUG DEBU...
def startElement(self, name, attrs)
def singlify_datasets(self)
def option_handler_castor_dir(self, option, opt_str, value, parser)
def option_handler_dataset_name(self, option, opt_str, value, parser): """Specify the name(s) of the ...
def option_handler_force(self, option, opt_str, value, parser)
def option_handler_caf_access(self, option, opt_str, value, parser)
def option_handler_frontier_connection(self, option, opt_str, value, parser)
def option_handler_crab_submission(self, option, opt_str, value, parser)
def create_harvesting_config(self, dataset_name)
def create_castor_path_name_special(self, dataset_name, run_number, castor_path_common)
def option_handler_harvesting_type(self, option, opt_str, value, parser)
def write_crab_config(self)
def create_harvesting_config(self, dataset_name): """Create the Python harvesting configuration for a...
def replace(string, replacements)
def write_multicrab_config(self)
def build_runs_ignore_list(self)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
def option_handler_preferred_site(self, option, opt_str, value, parser)
def option_handler_quiet(self, option, opt_str, value, parser)
def run(self)
def create_castor_path_name_common(self, dataset_name)
def show_exit_message(self)
DEBUG DEBUG DEBUGThis is probably only useful to make sure we don&#39;t muckthings up, right?Figure out across how many sites this sample has been spread.
Helper class: Error exception.
def check_globaltag(self, globaltag=None)
CRAB
def ref_hist_mappings_needed(self, dataset_name=None)
def dbs_resolve_runs(self, dataset_name)
def dbs_resolve_dataset_number_of_events(self, dataset_name): """Ask DBS across how many events this ...
Helper class: Usage exception.
def option_handler_no_ref_hists(self, option, opt_str, value, parser)
def create_me_extraction_config(self, dataset_name)
In case this file is the second step (the real harvestingstep) of the two-step harvesting we have to ...
def create_me_summary_config_file_name(self, dataset_name)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def build_datasets_information(self)
def check_globaltag_exists(self, globaltag, connect_name)
def check_ref_hist_mappings(self)
def check_ref_hist_tag(self, tag_name)
def dbs_resolve_datatype(self, dataset_name)
def create_multicrab_block_name(self, dataset_name, run_number, index)
def __init__(self, cmd_line_opts=None)
def __init__(self, tag_names)
def setup_harvesting_info(self)
Helper class: DBSXMLHandler.
def option_handler_list_types(self, option, opt_str, value, parser)
def create_output_file_name(self, dataset_name, run_number=None)
def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser)
def option_handler_book_keeping_file(self, option, opt_str, value, parser)
def pick_a_site(self, sites, cmssw_version)
self.logger.debug("Checking CASTOR path piece `%s&#39;" % \ piece)
def create_and_check_castor_dir(self, castor_dir)
def dbs_resolve_cmssw_version(self, dataset_name)
def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name)
def dbs_resolve_number_of_events(self, dataset_name, run_number=None)
def set_output_level(self, output_level)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def load_ref_hist_mappings(self)
def parse_cmd_line_options(self)
def option_handler_input_Jsonfile(self, option, opt_str, value, parser)
def option_handler_input_todofile(self, option, opt_str, value, parser)
def option_handler_input_spec(self, option, opt_str, value, parser)
def create_es_prefer_snippet(self, dataset_name)
def option_handler_debug(self, option, opt_str, value, parser)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:211
def dbs_resolve_globaltag(self, dataset_name)
def check_dbs(self)
def format_conditions_string(self, globaltag)
CMSHarvester class.
def build_runs_list(self, input_method, input_name)
def check_dataset_list(self)
def create_harvesting_config_file_name(self, dataset_name)
Only add the alarming piece to the file name if this isa spread-out dataset.
def __init__(self, msg)
def dbs_check_dataset_spread(self, dataset_name)
def dbs_resolve_dataset_number_of_sites(self, dataset_name): """Ask DBS across how many sites this da...
#define update(a, b)
def build_dataset_use_list(self)
def create_me_summary_output_file_name(self, dataset_name)
def characters(self, content)
def db_account_name_cms_cond_globaltag(self)
def option_handler_no_t1access(self, option, opt_str, value, parser)
def option_handler_globaltag(self, option, opt_str, value, parser)
def db_account_name_cms_cond_dqm_summary(self)
def check_input_status(self)
def dbs_resolve_dataset_name(self, dataset_name)
#define str(s)
def create_config_file_name(self, dataset_name, run_number)
def setup_dbs(self)
Now we try to do a very simple DBS search.
def build_runs_use_list(self)
def create_multicrab_config(self)
CRAB
def create_and_check_castor_dirs(self)
Helper class: CMSHarvesterHelpFormatter.
double split
Definition: MVATrainer.cc:139
def create_harvesting_output_file_name(self, dataset_name, run_number)
def endElement(self, name)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
def process_dataset_ignore_list(self)
def build_dataset_list(self, input_method, input_name)
class Handler(xml.sax.handler.ContentHandler): def startElement(self, name, attrs): if name == "resul...