CMS 3D CMS Logo

cmsHarvester.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 ###########################################################################
4 ## File : cmsHarvest.py
5 ## Authors : Jeroen Hegeman (jeroen.hegeman@cern.ch)
6 ## Niklas Pietsch (niklas.pietsch@desy.de)
7 ## Franseco Costanza (francesco.costanza@desy.de)
8 ## Last change: 20100308
9 
14 
15 """Main program to run all kinds of harvesting.
16 
17 These are the basic kinds of harvesting implemented (contact me if
18 your favourite is missing):
19 
20 - RelVal : Run for release validation samples. Makes heavy use of MC
21  truth information.
22 
23 - RelValFS: FastSim RelVal.
24 
25 - MC : Run for MC samples.
26 
27 - DQMOffline : Run for real data (could also be run for MC).
28 
29 For the mappings of these harvesting types to sequence names please
30 see the setup_harvesting_info() and option_handler_list_types()
31 methods.
32 
33 """
34 
35 ###########################################################################
36 
37 __version__ = "3.8.2p1" # (version jump to match release)
38 __author__ = "Jeroen Hegeman (jeroen.hegeman@cern.ch)," \
39  "Niklas Pietsch (niklas.pietsch@desy.de)"
40 
41 twiki_url = "https://twiki.cern.ch/twiki/bin/view/CMS/CmsHarvester"
42 
43 ###########################################################################
44 
45 ###########################################################################
46 ## TODO list
47 
88 
89 import os
90 import sys
91 import commands
92 import re
93 import logging
94 import optparse
95 import datetime
96 import copy
97 from inspect import getargspec
98 from random import choice
99 
100 
101 # These we need to communicate with DBS global DBSAPI
102 from DBSAPI.dbsApi import DbsApi
103 import DBSAPI.dbsException
105 from functools import reduce
106 # and these we need to parse the DBS output.
107 global xml
108 global SAXParseException
109 import xml.sax
110 from xml.sax import SAXParseException
111 
112 import Configuration.PyReleaseValidation
113 from Configuration.PyReleaseValidation.ConfigBuilder import \
114  ConfigBuilder, defaultOptions
115 # from Configuration.PyReleaseValidation.cmsDriverOptions import options, python_config_filename
116 
117 #import FWCore.ParameterSet.Config as cms
118 
119 # Debugging stuff.
120 import pdb
121 try:
122  import debug_hook
123 except ImportError:
124  pass
125 
126 ###########################################################################
127 ## Helper class: Usage exception.
128 ###########################################################################
130  def __init__(self, msg):
131  self.msg = msg
132  def __str__(self):
133  return repr(self.msg)
134 
135  # End of Usage.
136 
137 ###########################################################################
138 ## Helper class: Error exception.
139 ###########################################################################
141  def __init__(self, msg):
142  self.msg = msg
143  def __str__(self):
144  return repr(self.msg)
145 
146 ###########################################################################
147 ## Helper class: CMSHarvesterHelpFormatter.
148 ###########################################################################
149 
150 class CMSHarvesterHelpFormatter(optparse.IndentedHelpFormatter):
151  """Helper class to add some customised help output to cmsHarvester.
152 
153  We want to add some instructions, as well as a pointer to the CMS
154  Twiki.
155 
156  """
157 
158  def format_usage(self, usage):
159 
160  usage_lines = []
161 
162  sep_line = "-" * 60
163  usage_lines.append(sep_line)
164  usage_lines.append("Welcome to the CMS harvester, a (hopefully useful)")
165  usage_lines.append("tool to create harvesting configurations.")
166  usage_lines.append("For more information please have a look at the CMS Twiki:")
167  usage_lines.append(" %s" % twiki_url)
168  usage_lines.append(sep_line)
169  usage_lines.append("")
170 
171  # Since we only add to the output, we now just append the
172  # original output from IndentedHelpFormatter.
173  usage_lines.append(optparse.IndentedHelpFormatter. \
174  format_usage(self, usage))
175 
176  formatted_usage = "\n".join(usage_lines)
177  return formatted_usage
178 
179  # End of CMSHarvesterHelpFormatter.
180 
181 ###########################################################################
182 ## Helper class: DBSXMLHandler.
183 ###########################################################################
184 
185 class DBSXMLHandler(xml.sax.handler.ContentHandler):
186  """XML handler class to parse DBS results.
187 
188  The tricky thing here is that older DBS versions (2.0.5 and
189  earlier) return results in a different XML format than newer
190  versions. Previously the result values were returned as attributes
191  to the `result' element. The new approach returns result values as
192  contents of named elements.
193 
194  The old approach is handled directly in startElement(), the new
195  approach in characters().
196 
197  NOTE: All results are returned in the form of string values of
198  course!
199 
200  """
201 
202  # This is the required mapping from the name of the variable we
203  # ask for to what we call it ourselves. (Effectively this is the
204  # mapping between the old attribute key name and the new element
205  # name.)
206  mapping = {
207  "dataset" : "PATH",
208  "dataset.tag" : "PROCESSEDDATASET_GLOBALTAG",
209  "datatype.type" : "PRIMARYDSTYPE_TYPE",
210  "run" : "RUNS_RUNNUMBER",
211  "run.number" : "RUNS_RUNNUMBER",
212  "file.name" : "FILES_LOGICALFILENAME",
213  "file.numevents" : "FILES_NUMBEROFEVENTS",
214  "algo.version" : "APPVERSION_VERSION",
215  "site" : "STORAGEELEMENT_SENAME",
216  }
217 
218  def __init__(self, tag_names):
219  # This is a list used as stack to keep track of where we are
220  # in the element tree.
222  self.tag_names = tag_names
223  self.results = {}
224 
225  def startElement(self, name, attrs):
226  self.element_position.append(name)
227 
228  self.current_value = []
229 
230  #----------
231 
232  # This is to catch results from DBS 2.0.5 and earlier.
233  if name == "result":
234  for name in self.tag_names:
235  key = DBSXMLHandler.mapping[name]
236  value = str(attrs[key])
237  try:
238  self.results[name].append(value)
239  except KeyError:
240  self.results[name] = [value]
241 
242  #----------
243 
244  def endElement(self, name):
245  assert self.current_element() == name, \
246  "closing unopenend element `%s'" % name
247 
248  if self.current_element() in self.tag_names:
249  contents = "".join(self.current_value)
250  if self.current_element() in self.results:
251  self.results[self.current_element()].append(contents)
252  else:
253  self.results[self.current_element()] = [contents]
254 
255  self.element_position.pop()
256 
257  def characters(self, content):
258  # NOTE: It is possible that the actual contents of the tag
259  # gets split into multiple pieces. This method will be called
260  # for each of the pieces. This means we have to concatenate
261  # everything ourselves.
262  if self.current_element() in self.tag_names:
263  self.current_value.append(content)
264 
265  def current_element(self):
266  return self.element_position[-1]
267 
269  """Make sure that all results arrays have equal length.
270 
271  We should have received complete rows from DBS. I.e. all
272  results arrays in the handler should be of equal length.
273 
274  """
275 
276  results_valid = True
277 
278  res_names = self.results.keys()
279  if len(res_names) > 1:
280  for res_name in res_names[1:]:
281  res_tmp = self.results[res_name]
282  if len(res_tmp) != len(self.results[res_names[0]]):
283  results_valid = False
284 
285  return results_valid
286 
287  # End of DBSXMLHandler.
288 
289 ###########################################################################
290 ## CMSHarvester class.
291 ###########################################################################
292 
294  """Class to perform CMS harvesting.
295 
296  More documentation `obviously' to follow.
297 
298  """
299 
300  ##########
301 
302  def __init__(self, cmd_line_opts=None):
303  "Initialize class and process command line options."
304 
305  self.version = __version__
306 
307  # These are the harvesting types allowed. See the head of this
308  # file for more information.
310  "RelVal",
311  "RelValFS",
312  "MC",
313  "DQMOffline",
314  ]
315 
316  # These are the possible harvesting modes:
317  # - Single-step: harvesting takes place on-site in a single
318  # step. For each samples only a single ROOT file containing
319  # the harvesting results is returned.
320  # - Single-step-allow-partial: special hack to allow
321  # harvesting of partial statistics using single-step
322  # harvesting on spread-out samples.
323  # - Two-step: harvesting takes place in two steps. The first
324  # step returns a series of monitoring elenent summaries for
325  # each sample. The second step then merges these summaries
326  # locally and does the real harvesting. This second step
327  # produces the ROOT file containing the harvesting results.
329  "single-step",
330  "single-step-allow-partial",
331  "two-step"
332  ]
333 
334  # It is possible to specify a GlobalTag that will override any
335  # choices (regarding GlobalTags) made by the cmsHarvester.
336  # BUG BUG BUG
337  # For the moment, until I figure out a way to obtain the
338  # GlobalTag with which a given data (!) dataset was created,
339  # it is necessary to specify a GlobalTag for harvesting of
340  # data.
341  # BUG BUG BUG end
342  self.globaltag = None
343 
344  # It's also possible to switch off the use of reference
345  # histograms altogether.
346  self.use_ref_hists = True
347 
348  # The database name and account are hard-coded. They are not
349  # likely to change before the end-of-life of this tool. But of
350  # course there is a way to override this from the command
351  # line. One can even override the Frontier connection used for
352  # the GlobalTag and for the reference histograms
353  # independently. Please only use this for testing purposes.
355  self.frontier_connection_name["globaltag"] = "frontier://" \
356  "FrontierProd/"
357  self.frontier_connection_name["refhists"] = "frontier://" \
358  "FrontierProd/"
360  for key in self.frontier_connection_name.keys():
361  self.frontier_connection_overridden[key] = False
362 
363  # This contains information specific to each of the harvesting
364  # types. Used to create the harvesting configuration. It is
365  # filled by setup_harvesting_info().
366  self.harvesting_info = None
367 
368  ###
369 
370  # These are default `unused' values that will be filled in
371  # depending on the command line options.
372 
373  # The type of harvesting we're doing. See
374  # self.harvesting_types for allowed types.
375  self.harvesting_type = None
376 
377  # The harvesting mode, popularly known as single-step
378  # vs. two-step. The thing to remember at this point is that
379  # single-step is only possible for samples located completely
380  # at a single site (i.e. SE).
381  self.harvesting_mode = None
382  # BUG BUG BUG
383  # Default temporarily set to two-step until we can get staged
384  # jobs working with CRAB.
385  self.harvesting_mode_default = "single-step"
386  # BUG BUG BUG end
387 
388  # The input method: are we reading a dataset name (or regexp)
389  # directly from the command line or are we reading a file
390  # containing a list of dataset specifications. Actually we
391  # keep one of each for both datasets and runs.
392  self.input_method = {}
393  self.input_method["datasets"] = {}
394  self.input_method["datasets"]["use"] = None
395  self.input_method["datasets"]["ignore"] = None
396  self.input_method["runs"] = {}
397  self.input_method["runs"]["use"] = None
398  self.input_method["runs"]["ignore"] = None
399  self.input_method["runs"]["ignore"] = None
400  # The name of whatever input we're using.
401  self.input_name = {}
402  self.input_name["datasets"] = {}
403  self.input_name["datasets"]["use"] = None
404  self.input_name["datasets"]["ignore"] = None
405  self.input_name["runs"] = {}
406  self.input_name["runs"]["use"] = None
407  self.input_name["runs"]["ignore"] = None
408 
409  self.Jsonlumi = False
410  self.Jsonfilename = "YourJSON.txt"
411  self.Jsonrunfilename = "YourJSON.txt"
412  self.todofile = "YourToDofile.txt"
413 
414  # If this is true, we're running in `force mode'. In this case
415  # the sanity checks are performed but failure will not halt
416  # everything.
417  self.force_running = None
418 
419  # The base path of the output dir in CASTOR.
420  self.castor_base_dir = None
421  self.castor_base_dir_default = "/castor/cern.ch/" \
422  "cms/store/temp/" \
423  "dqm/offline/harvesting_output/"
424 
425  # The name of the file to be used for book keeping: which
426  # datasets, runs, etc. we have already processed.
427  self.book_keeping_file_name = None
428  self.book_keeping_file_name_default = "harvesting_accounting.txt"
429 
430  # The dataset name to reference histogram name mapping is read
431  # from a text file. The name of this file is kept in the
432  # following variable.
433  self.ref_hist_mappings_file_name = None
434  # And this is the default value.
435  self.ref_hist_mappings_file_name_default = "harvesting_ref_hist_mappings.txt"
436 
437  # Hmmm, hard-coded prefix of the CERN CASTOR area. This is the
438  # only supported CASTOR area.
439  # NOTE: Make sure this one starts with a `/'.
440  self.castor_prefix = "/castor/cern.ch"
441 
442  # Normally the central harvesting should be done using the
443  # `t1access' grid role. To be able to run without T1 access
444  # the --no-t1access flag can be used. This variable keeps
445  # track of that special mode.
446  self.non_t1access = False
447  self.caf_access = False
448  self.saveByLumiSection = False
449  self.crab_submission = False
450  self.nr_max_sites = 1
451 
452  self.preferred_site = "no preference"
453 
454  # This will become the list of datasets and runs to consider
455  self.datasets_to_use = {}
456  # and this will become the list of datasets and runs to skip.
457  self.datasets_to_ignore = {}
458  # This, in turn, will hold all book keeping information.
459  self.book_keeping_information = {}
460  # And this is where the dataset name to reference histogram
461  # name mapping is stored.
462  self.ref_hist_mappings = {}
463 
464  # We're now also allowing run selection. This means we also
465  # have to keep list of runs requested and vetoed by the user.
466  self.runs_to_use = {}
467  self.runs_to_ignore = {}
468 
469  # Cache for CMSSW version availability at different sites.
470  self.sites_and_versions_cache = {}
471 
472  # Cache for checked GlobalTags.
473  self.globaltag_check_cache = []
474 
475  # Global flag to see if there were any jobs for which we could
476  # not find a matching site.
477  self.all_sites_found = True
478 
479  # Helper string centrally defined.
480  self.no_matching_site_found_str = "no_matching_site_found"
481 
482  # Store command line options for later use.
483  if cmd_line_opts is None:
484  cmd_line_opts = sys.argv[1:]
485  self.cmd_line_opts = cmd_line_opts
486 
487  # Set up the logger.
488  log_handler = logging.StreamHandler()
489  # This is the default log formatter, the debug option switches
490  # on some more information.
491  log_formatter = logging.Formatter("%(message)s")
492  log_handler.setFormatter(log_formatter)
493  logger = logging.getLogger()
494  logger.name = "main"
495  logger.addHandler(log_handler)
496  self.logger = logger
497  # The default output mode is quite verbose.
498  self.set_output_level("NORMAL")
499 
500  #logger.debug("Initialized successfully")
501 
502  # End of __init__.
503 
504  ##########
505 
506  def cleanup(self):
507  "Clean up after ourselves."
508 
509  # NOTE: This is the safe replacement of __del__.
510 
511  #self.logger.debug("All done -> shutting down")
512  logging.shutdown()
513 
514  # End of cleanup.
515 
516  ##########
517 
518  def time_stamp(self):
519  "Create a timestamp to use in the created config files."
520 
521  time_now = datetime.datetime.utcnow()
522  # We don't care about the microseconds.
523  time_now = time_now.replace(microsecond = 0)
524  time_stamp = "%sUTC" % datetime.datetime.isoformat(time_now)
525 
526  # End of time_stamp.
527  return time_stamp
528 
529  ##########
530 
531  def ident_string(self):
532  "Spit out an identification string for cmsHarvester.py."
533 
534  ident_str = "`cmsHarvester.py " \
535  "version %s': cmsHarvester.py %s" % \
536  (__version__,
537  reduce(lambda x, y: x+' '+y, sys.argv[1:]))
538 
539  return ident_str
540 
541  ##########
542 
543  def format_conditions_string(self, globaltag):
544  """Create the conditions string needed for `cmsDriver'.
545 
546  Just glueing the FrontierConditions bit in front of it really.
547 
548  """
549 
550  # Not very robust but okay. The idea is that if the user
551  # specified (since this cannot happen with GlobalTags coming
552  # from DBS) something containing `conditions', they probably
553  # know what they're doing and we should not muck things up. In
554  # all other cases we just assume we only received the
555  # GlobalTag part and we built the usual conditions string from
556  # that .
557  if globaltag.lower().find("conditions") > -1:
558  conditions_string = globaltag
559  else:
560  conditions_string = "FrontierConditions_GlobalTag,%s" % \
561  globaltag
562 
563  # End of format_conditions_string.
564  return conditions_string
565 
566  ##########
567 
569  """Return the database account name used to store the GlobalTag.
570 
571  The name of the database account depends (albeit weakly) on
572  the CMSSW release version.
573 
574  """
575 
576  # This never changed, unlike the cms_cond_31X_DQM_SUMMARY ->
577  # cms_cond_34X_DQM transition.
578  account_name = "CMS_COND_31X_GLOBALTAG"
579 
580  # End of db_account_name_cms_cond_globaltag.
581  return account_name
582 
583  ##########
584 
586  """See db_account_name_cms_cond_globaltag."""
587 
588  account_name = None
589  version = self.cmssw_version[6:11]
590  if version < "3_4_0":
591  account_name = "CMS_COND_31X_DQM_SUMMARY"
592  else:
593  account_name = "CMS_COND_34X"
594 
595  # End of db_account_name_cms_cond_dqm_summary.
596  return account_name
597 
598  ##########
599 
601  "Create a nice header to be used to mark the generated files."
602 
603  tmp = []
604 
605  time_stamp = self.time_stamp()
606  ident_str = self.ident_string()
607  tmp.append("# %s" % time_stamp)
608  tmp.append("# WARNING: This file was created automatically!")
609  tmp.append("")
610  tmp.append("# Created by %s" % ident_str)
611 
612  header = "\n".join(tmp)
613 
614  # End of config_file_header.
615  return header
616 
617  ##########
618 
619  def set_output_level(self, output_level):
620  """Adjust the level of output generated.
621 
622  Choices are:
623  - normal : default level of output
624  - quiet : less output than the default
625  - verbose : some additional information
626  - debug : lots more information, may be overwhelming
627 
628  NOTE: The debug option is a bit special in the sense that it
629  also modifies the output format.
630 
631  """
632 
633  # NOTE: These levels are hooked up to the ones used in the
634  # logging module.
635  output_levels = {
636  "NORMAL" : logging.INFO,
637  "QUIET" : logging.WARNING,
638  "VERBOSE" : logging.INFO,
639  "DEBUG" : logging.DEBUG
640  }
641 
642  output_level = output_level.upper()
643 
644  try:
645  # Update the logger.
646  self.log_level = output_levels[output_level]
647  self.logger.setLevel(self.log_level)
648  except KeyError:
649  # Show a complaint
650  self.logger.fatal("Unknown output level `%s'" % ouput_level)
651  # and re-raise an exception.
652  raise Exception
653 
654  # End of set_output_level.
655 
656  ##########
657 
658  def option_handler_debug(self, option, opt_str, value, parser):
659  """Switch to debug mode.
660 
661  This both increases the amount of output generated, as well as
662  changes the format used (more detailed information is given).
663 
664  """
665 
666  # Switch to a more informative log formatter for debugging.
667  log_formatter_debug = logging.Formatter("[%(levelname)s] " \
668  # NOTE: funcName was
669  # only implemented
670  # starting with python
671  # 2.5.
672  #"%(funcName)s() " \
673  #"@%(filename)s:%(lineno)d " \
674  "%(message)s")
675  # Hmmm, not very nice. This assumes there's only a single
676  # handler associated with the current logger.
677  log_handler = self.logger.handlers[0]
678  log_handler.setFormatter(log_formatter_debug)
679  self.set_output_level("DEBUG")
680 
681  # End of option_handler_debug.
682 
683  ##########
684 
685  def option_handler_quiet(self, option, opt_str, value, parser):
686  "Switch to quiet mode: less verbose."
687 
688  self.set_output_level("QUIET")
689 
690  # End of option_handler_quiet.
691 
692  ##########
693 
694  def option_handler_force(self, option, opt_str, value, parser):
695  """Switch on `force mode' in which case we don't brake for nobody.
696 
697  In so-called `force mode' all sanity checks are performed but
698  we don't halt on failure. Of course this requires some care
699  from the user.
700 
701  """
702 
703  self.logger.debug("Switching on `force mode'.")
704  self.force_running = True
705 
706  # End of option_handler_force.
707 
708  ##########
709 
710  def option_handler_harvesting_type(self, option, opt_str, value, parser):
711  """Set the harvesting type to be used.
712 
713  This checks that no harvesting type is already set, and sets
714  the harvesting type to be used to the one specified. If a
715  harvesting type is already set an exception is thrown. The
716  same happens when an unknown type is specified.
717 
718  """
719 
720  # Check for (in)valid harvesting types.
721  # NOTE: The matching is done in a bit of a complicated
722  # way. This allows the specification of the type to be
723  # case-insensitive while still ending up with the properly
724  # `cased' version afterwards.
725  value = value.lower()
726  harvesting_types_lowered = [i.lower() for i in self.harvesting_types]
727  try:
728  type_index = harvesting_types_lowered.index(value)
729  # If this works, we now have the index to the `properly
730  # cased' version of the harvesting type.
731  except ValueError:
732  self.logger.fatal("Unknown harvesting type `%s'" % \
733  value)
734  self.logger.fatal(" possible types are: %s" %
735  ", ".join(self.harvesting_types))
736  raise Usage("Unknown harvesting type `%s'" % \
737  value)
738 
739  # Check if multiple (by definition conflicting) harvesting
740  # types are being specified.
741  if not self.harvesting_type is None:
742  msg = "Only one harvesting type should be specified"
743  self.logger.fatal(msg)
744  raise Usage(msg)
745  self.harvesting_type = self.harvesting_types[type_index]
746 
747  self.logger.info("Harvesting type to be used: `%s'" % \
748  self.harvesting_type)
749 
750  # End of option_handler_harvesting_type.
751 
752  ##########
753 
754  def option_handler_harvesting_mode(self, option, opt_str, value, parser):
755  """Set the harvesting mode to be used.
756 
757  Single-step harvesting can be used for samples that are
758  located completely at a single site (= SE). Otherwise use
759  two-step mode.
760 
761  """
762 
763  # Check for valid mode.
764  harvesting_mode = value.lower()
765  if not harvesting_mode in self.harvesting_modes:
766  msg = "Unknown harvesting mode `%s'" % harvesting_mode
767  self.logger.fatal(msg)
768  self.logger.fatal(" possible modes are: %s" % \
769  ", ".join(self.harvesting_modes))
770  raise Usage(msg)
771 
772  # Check if we've been given only a single mode, otherwise
773  # complain.
774  if not self.harvesting_mode is None:
775  msg = "Only one harvesting mode should be specified"
776  self.logger.fatal(msg)
777  raise Usage(msg)
778  self.harvesting_mode = harvesting_mode
779 
780  self.logger.info("Harvesting mode to be used: `%s'" % \
781  self.harvesting_mode)
782 
783  # End of option_handler_harvesting_mode.
784 
785  ##########
786 
787  def option_handler_globaltag(self, option, opt_str, value, parser):
788  """Set the GlobalTag to be used, overriding our own choices.
789 
790  By default the cmsHarvester will use the GlobalTag with which
791  a given dataset was created also for the harvesting. The
792  --globaltag option is the way to override this behaviour.
793 
794  """
795 
796  # Make sure that this flag only occurred once.
797  if not self.globaltag is None:
798  msg = "Only one GlobalTag should be specified"
799  self.logger.fatal(msg)
800  raise Usage(msg)
801  self.globaltag = value
802 
803  self.logger.info("GlobalTag to be used: `%s'" % \
804  self.globaltag)
805 
806  # End of option_handler_globaltag.
807 
808  ##########
809 
810  def option_handler_no_ref_hists(self, option, opt_str, value, parser):
811  "Switch use of all reference histograms off."
812 
813  self.use_ref_hists = False
814 
815  self.logger.warning("Switching off all use of reference histograms")
816 
817  # End of option_handler_no_ref_hists.
818 
819  ##########
820 
821  def option_handler_frontier_connection(self, option, opt_str,
822  value, parser):
823  """Override the default Frontier connection string.
824 
825  Please only use this for testing (e.g. when a test payload has
826  been inserted into cms_orc_off instead of cms_orc_on).
827 
828  This method gets called for three different command line
829  options:
830  - --frontier-connection,
831  - --frontier-connection-for-globaltag,
832  - --frontier-connection-for-refhists.
833  Appropriate care has to be taken to make sure things are only
834  specified once.
835 
836  """
837 
838  # Figure out with which command line option we've been called.
839  frontier_type = opt_str.split("-")[-1]
840  if frontier_type == "connection":
841  # Main option: change all connection strings.
842  frontier_types = self.frontier_connection_name.keys()
843  else:
844  frontier_types = [frontier_type]
845 
846  # Make sure that each Frontier connection is specified only
847  # once. (Okay, in a bit of a dodgy way...)
848  for connection_name in frontier_types:
849  if self.frontier_connection_overridden[connection_name] == True:
850  msg = "Please specify either:\n" \
851  " `--frontier-connection' to change the " \
852  "Frontier connection used for everything, or\n" \
853  "either one or both of\n" \
854  " `--frontier-connection-for-globaltag' to " \
855  "change the Frontier connection used for the " \
856  "GlobalTag and\n" \
857  " `--frontier-connection-for-refhists' to change " \
858  "the Frontier connection used for the " \
859  "reference histograms."
860  self.logger.fatal(msg)
861  raise Usage(msg)
862 
863  frontier_prefix = "frontier://"
864  if not value.startswith(frontier_prefix):
865  msg = "Expecting Frontier connections to start with " \
866  "`%s'. You specified `%s'." % \
867  (frontier_prefix, value)
868  self.logger.fatal(msg)
869  raise Usage(msg)
870  # We also kind of expect this to be either FrontierPrep or
871  # FrontierProd (but this is just a warning).
872  if value.find("FrontierProd") < 0 and \
873  value.find("FrontierProd") < 0:
874  msg = "Expecting Frontier connections to contain either " \
875  "`FrontierProd' or `FrontierPrep'. You specified " \
876  "`%s'. Are you sure?" % \
877  value
878  self.logger.warning(msg)
879 
880  if not value.endswith("/"):
881  value += "/"
882 
883  for connection_name in frontier_types:
884  self.frontier_connection_name[connection_name] = value
885  self.frontier_connection_overridden[connection_name] = True
886 
887  frontier_type_str = "unknown"
888  if connection_name == "globaltag":
889  frontier_type_str = "the GlobalTag"
890  elif connection_name == "refhists":
891  frontier_type_str = "the reference histograms"
892 
893  self.logger.warning("Overriding default Frontier " \
894  "connection for %s " \
895  "with `%s'" % \
896  (frontier_type_str,
897  self.frontier_connection_name[connection_name]))
898 
899  # End of option_handler_frontier_connection
900 
901  ##########
902 
903  def option_handler_input_todofile(self, option, opt_str, value, parser):
904 
905  self.todofile = value
906  # End of option_handler_input_todofile.
907 
908  ##########
909 
910  def option_handler_input_Jsonfile(self, option, opt_str, value, parser):
911 
912  self.Jsonfilename = value
913  # End of option_handler_input_Jsonfile.
914 
915  ##########
916 
917  def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser):
918 
919  self.Jsonrunfilename = value
920  # End of option_handler_input_Jsonrunfile.
921 
922  ##########
923 
924  def option_handler_input_spec(self, option, opt_str, value, parser):
925  """TODO TODO TODO
926  Document this...
927 
928  """
929 
930  # Figure out if we were called for the `use these' or the
931  # `ignore these' case.
932  if opt_str.lower().find("ignore") > -1:
933  spec_type = "ignore"
934  else:
935  spec_type = "use"
936 
937  # Similar: are we being called for datasets or for runs?
938  if opt_str.lower().find("dataset") > -1:
939  select_type = "datasets"
940  else:
941  select_type = "runs"
942 
943  if not self.input_method[select_type][spec_type] is None:
944  msg = "Please only specify one input method " \
945  "(for the `%s' case)" % opt_str
946  self.logger.fatal(msg)
947  raise Usage(msg)
948 
949  input_method = opt_str.replace("-", "").replace("ignore", "")
950  self.input_method[select_type][spec_type] = input_method
951  self.input_name[select_type][spec_type] = value
952 
953  self.logger.debug("Input method for the `%s' case: %s" % \
954  (spec_type, input_method))
955 
956  # End of option_handler_input_spec
957 
958  ##########
959 
960  def option_handler_book_keeping_file(self, option, opt_str, value, parser):
961  """Store the name of the file to be used for book keeping.
962 
963  The only check done here is that only a single book keeping
964  file is specified.
965 
966  """
967 
968  file_name = value
969 
970  if not self.book_keeping_file_name is None:
971  msg = "Only one book keeping file should be specified"
972  self.logger.fatal(msg)
973  raise Usage(msg)
974  self.book_keeping_file_name = file_name
975 
976  self.logger.info("Book keeping file to be used: `%s'" % \
978 
979  # End of option_handler_book_keeping_file.
980 
981  ##########
982 
983  def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser):
984  """Store the name of the file for the ref. histogram mapping.
985 
986  """
987 
988  file_name = value
989 
990  if not self.ref_hist_mappings_file_name is None:
991  msg = "Only one reference histogram mapping file " \
992  "should be specified"
993  self.logger.fatal(msg)
994  raise Usage(msg)
996 
997  self.logger.info("Reference histogram mapping file " \
998  "to be used: `%s'" % \
1000 
1001  # End of option_handler_ref_hist_mapping_file.
1002 
1003  ##########
1004 
1005  # OBSOLETE OBSOLETE OBSOLETE
1006 
1007 ## def option_handler_dataset_name(self, option, opt_str, value, parser):
1008 ## """Specify the name(s) of the dataset(s) to be processed.
1009 
1010 ## It is checked to make sure that no dataset name or listfile
1011 ## names are given yet. If all is well (i.e. we still have a
1012 ## clean slate) the dataset name is stored for later use,
1013 ## otherwise a Usage exception is raised.
1014 
1015 ## """
1016 
1017 ## if not self.input_method is None:
1018 ## if self.input_method == "dataset":
1019 ## raise Usage("Please only feed me one dataset specification")
1020 ## elif self.input_method == "listfile":
1021 ## raise Usage("Cannot specify both dataset and input list file")
1022 ## else:
1023 ## assert False, "Unknown input method `%s'" % self.input_method
1024 ## self.input_method = "dataset"
1025 ## self.input_name = value
1026 ## self.logger.info("Input method used: %s" % self.input_method)
1027 
1028 ## # End of option_handler_dataset_name.
1029 
1030 ## ##########
1031 
1032 ## def option_handler_listfile_name(self, option, opt_str, value, parser):
1033 ## """Specify the input list file containing datasets to be processed.
1034 
1035 ## It is checked to make sure that no dataset name or listfile
1036 ## names are given yet. If all is well (i.e. we still have a
1037 ## clean slate) the listfile name is stored for later use,
1038 ## otherwise a Usage exception is raised.
1039 
1040 ## """
1041 
1042 ## if not self.input_method is None:
1043 ## if self.input_method == "listfile":
1044 ## raise Usage("Please only feed me one list file")
1045 ## elif self.input_method == "dataset":
1046 ## raise Usage("Cannot specify both dataset and input list file")
1047 ## else:
1048 ## assert False, "Unknown input method `%s'" % self.input_method
1049 ## self.input_method = "listfile"
1050 ## self.input_name = value
1051 ## self.logger.info("Input method used: %s" % self.input_method)
1052 
1053 ## # End of option_handler_listfile_name.
1054 
1055  # OBSOLETE OBSOLETE OBSOLETE end
1056 
1057  ##########
1058 
1059  def option_handler_castor_dir(self, option, opt_str, value, parser):
1060  """Specify where on CASTOR the output should go.
1061 
1062  At the moment only output to CERN CASTOR is
1063  supported. Eventually the harvested results should go into the
1064  central place for DQM on CASTOR anyway.
1065 
1066  """
1067 
1068  # Check format of specified CASTOR area.
1069  castor_dir = value
1070  #castor_dir = castor_dir.lstrip(os.path.sep)
1071  castor_prefix = self.castor_prefix
1072 
1073  # Add a leading slash if necessary and clean up the path.
1074  castor_dir = os.path.join(os.path.sep, castor_dir)
1075  self.castor_base_dir = os.path.normpath(castor_dir)
1076 
1077  self.logger.info("CASTOR (base) area to be used: `%s'" % \
1078  self.castor_base_dir)
1079 
1080  # End of option_handler_castor_dir.
1081 
1082  ##########
1083 
1084  def option_handler_no_t1access(self, option, opt_str, value, parser):
1085  """Set the self.no_t1access flag to try and create jobs that
1086  run without special `t1access' role.
1087 
1088  """
1089 
1090  self.non_t1access = True
1091 
1092  self.logger.warning("Running in `non-t1access' mode. " \
1093  "Will try to create jobs that run " \
1094  "without special rights but no " \
1095  "further promises...")
1096 
1097  # End of option_handler_no_t1access.
1098 
1099  ##########
1100 
1101  def option_handler_caf_access(self, option, opt_str, value, parser):
1102  """Set the self.caf_access flag to try and create jobs that
1103  run on the CAF.
1104 
1105  """
1106  self.caf_access = True
1107 
1108  self.logger.warning("Running in `caf_access' mode. " \
1109  "Will try to create jobs that run " \
1110  "on CAF but no" \
1111  "further promises...")
1112 
1113  # End of option_handler_caf_access.
1114 
1115  ##########
1116 
1117  def option_handler_saveByLumiSection(self, option, opt_str, value, parser):
1118  """Set process.dqmSaver.saveByLumiSectiont=1 in cfg harvesting file
1119  """
1120  self.saveByLumiSection = True
1121 
1122  self.logger.warning("waning concerning saveByLumiSection option")
1123 
1124  # End of option_handler_saveByLumiSection.
1125 
1126 
1127  ##########
1128 
1129  def option_handler_crab_submission(self, option, opt_str, value, parser):
1130  """Crab jobs are not created and
1131  "submitted automatically",
1132  """
1133  self.crab_submission = True
1134 
1135  # End of option_handler_crab_submission.
1136 
1137  ##########
1138 
1139  def option_handler_sites(self, option, opt_str, value, parser):
1140 
1141  self.nr_max_sites = value
1142 
1143  ##########
1144 
1145  def option_handler_preferred_site(self, option, opt_str, value, parser):
1146 
1147  self.preferred_site = value
1148 
1149  ##########
1150 
1151  def option_handler_list_types(self, option, opt_str, value, parser):
1152  """List all harvesting types and their mappings.
1153 
1154  This lists all implemented harvesting types with their
1155  corresponding mappings to sequence names. This had to be
1156  separated out from the help since it depends on the CMSSW
1157  version and was making things a bit of a mess.
1158 
1159  NOTE: There is no way (at least not that I could come up with)
1160  to code this in a neat generic way that can be read both by
1161  this method and by setup_harvesting_info(). Please try hard to
1162  keep these two methods in sync!
1163 
1164  """
1165 
1166  sep_line = "-" * 50
1167  sep_line_short = "-" * 20
1168 
1169  print sep_line
1170  print "The following harvesting types are available:"
1171  print sep_line
1172 
1173  print "`RelVal' maps to:"
1174  print " pre-3_3_0 : HARVESTING:validationHarvesting"
1175  print " 3_4_0_pre2 and later: HARVESTING:validationHarvesting+dqmHarvesting"
1176  print " Exceptions:"
1177  print " 3_3_0_pre1-4 : HARVESTING:validationHarvesting"
1178  print " 3_3_0_pre6 : HARVESTING:validationHarvesting"
1179  print " 3_4_0_pre1 : HARVESTING:validationHarvesting"
1180 
1181  print sep_line_short
1182 
1183  print "`RelValFS' maps to:"
1184  print " always : HARVESTING:validationHarvestingFS"
1185 
1186  print sep_line_short
1187 
1188  print "`MC' maps to:"
1189  print " always : HARVESTING:validationprodHarvesting"
1190 
1191  print sep_line_short
1192 
1193  print "`DQMOffline' maps to:"
1194  print " always : HARVESTING:dqmHarvesting"
1195 
1196  print sep_line
1197 
1198  # We're done, let's quit. (This is the same thing optparse
1199  # does after printing the help.)
1200  raise SystemExit
1201 
1202  # End of option_handler_list_types.
1203 
1204  ##########
1205 
1207  """Fill our dictionary with all info needed to understand
1208  harvesting.
1209 
1210  This depends on the CMSSW version since at some point the
1211  names and sequences were modified.
1212 
1213  NOTE: There is no way (at least not that I could come up with)
1214  to code this in a neat generic way that can be read both by
1215  this method and by option_handler_list_types(). Please try
1216  hard to keep these two methods in sync!
1217 
1218  """
1219 
1220  assert not self.cmssw_version is None, \
1221  "ERROR setup_harvesting() requires " \
1222  "self.cmssw_version to be set!!!"
1223 
1224  harvesting_info = {}
1225 
1226  # This is the version-independent part.
1227  harvesting_info["DQMOffline"] = {}
1228  harvesting_info["DQMOffline"]["beamspot"] = None
1229  harvesting_info["DQMOffline"]["eventcontent"] = None
1230  harvesting_info["DQMOffline"]["harvesting"] = "AtRunEnd"
1231 
1232  harvesting_info["RelVal"] = {}
1233  harvesting_info["RelVal"]["beamspot"] = None
1234  harvesting_info["RelVal"]["eventcontent"] = None
1235  harvesting_info["RelVal"]["harvesting"] = "AtRunEnd"
1236 
1237  harvesting_info["RelValFS"] = {}
1238  harvesting_info["RelValFS"]["beamspot"] = None
1239  harvesting_info["RelValFS"]["eventcontent"] = None
1240  harvesting_info["RelValFS"]["harvesting"] = "AtRunEnd"
1241 
1242  harvesting_info["MC"] = {}
1243  harvesting_info["MC"]["beamspot"] = None
1244  harvesting_info["MC"]["eventcontent"] = None
1245  harvesting_info["MC"]["harvesting"] = "AtRunEnd"
1246 
1247  # This is the version-dependent part. And I know, strictly
1248  # speaking it's not necessary to fill in all three types since
1249  # in a single run we'll only use one type anyway. This does
1250  # look more readable, however, and required less thought from
1251  # my side when I put this together.
1252 
1253  # DEBUG DEBUG DEBUG
1254  # Check that we understand our own version naming.
1255  assert self.cmssw_version.startswith("CMSSW_")
1256  # DEBUG DEBUG DEBUG end
1257 
1258  version = self.cmssw_version[6:]
1259 
1260  #----------
1261 
1262  # RelVal
1263  step_string = None
1264  if version < "3_3_0":
1265  step_string = "validationHarvesting"
1266  elif version in ["3_3_0_pre1", "3_3_0_pre2",
1267  "3_3_0_pre3", "3_3_0_pre4",
1268  "3_3_0_pre6", "3_4_0_pre1"]:
1269  step_string = "validationHarvesting"
1270  else:
1271  step_string = "validationHarvesting+dqmHarvesting"
1272 
1273  harvesting_info["RelVal"]["step_string"] = step_string
1274 
1275  # DEBUG DEBUG DEBUG
1276  # Let's make sure we found something.
1277  assert not step_string is None, \
1278  "ERROR Could not decide a RelVal harvesting sequence " \
1279  "for CMSSW version %s" % self.cmssw_version
1280  # DEBUG DEBUG DEBUG end
1281 
1282  #----------
1283 
1284  # RelVal
1285  step_string = "validationHarvestingFS"
1286 
1287  harvesting_info["RelValFS"]["step_string"] = step_string
1288 
1289  #----------
1290 
1291  # MC
1292  step_string = "validationprodHarvesting"
1293 
1294  harvesting_info["MC"]["step_string"] = step_string
1295 
1296  # DEBUG DEBUG DEBUG
1297  # Let's make sure we found something.
1298  assert not step_string is None, \
1299  "ERROR Could not decide a MC harvesting " \
1300  "sequence for CMSSW version %s" % self.cmssw_version
1301  # DEBUG DEBUG DEBUG end
1302 
1303  #----------
1304 
1305  # DQMOffline
1306  step_string = "dqmHarvesting"
1307 
1308  harvesting_info["DQMOffline"]["step_string"] = step_string
1309 
1310  #----------
1311 
1312  self.harvesting_info = harvesting_info
1313 
1314  self.logger.info("Based on the CMSSW version (%s) " \
1315  "I decided to use the `HARVESTING:%s' " \
1316  "sequence for %s harvesting" % \
1317  (self.cmssw_version,
1318  self.harvesting_info[self.harvesting_type]["step_string"],
1319  self.harvesting_type))
1320 
1321  # End of setup_harvesting_info.
1322 
1323  ##########
1324 
1325  def create_castor_path_name_common(self, dataset_name):
1326  """Build the common part of the output path to be used on
1327  CASTOR.
1328 
1329  This consists of the CASTOR area base path specified by the
1330  user and a piece depending on the data type (data vs. MC), the
1331  harvesting type and the dataset name followed by a piece
1332  containing the run number and event count. (See comments in
1333  create_castor_path_name_special for details.) This method
1334  creates the common part, without run number and event count.
1335 
1336  """
1337 
1338  castor_path = self.castor_base_dir
1339 
1340  ###
1341 
1342  # The data type: data vs. mc.
1343  datatype = self.datasets_information[dataset_name]["datatype"]
1344  datatype = datatype.lower()
1345  castor_path = os.path.join(castor_path, datatype)
1346 
1347  # The harvesting type.
1348  harvesting_type = self.harvesting_type
1349  harvesting_type = harvesting_type.lower()
1350  castor_path = os.path.join(castor_path, harvesting_type)
1351 
1352  # The CMSSW release version (only the `digits'). Note that the
1353  # CMSSW version used here is the version used for harvesting,
1354  # not the one from the dataset. This does make the results
1355  # slightly harder to find. On the other hand it solves
1356  # problems in case one re-harvests a given dataset with a
1357  # different CMSSW version, which would lead to ambiguous path
1358  # names. (Of course for many cases the harvesting is done with
1359  # the same CMSSW version the dataset was created with.)
1360  release_version = self.cmssw_version
1361  release_version = release_version.lower(). \
1362  replace("cmssw", ""). \
1363  strip("_")
1364  castor_path = os.path.join(castor_path, release_version)
1365 
1366  # The dataset name.
1367  dataset_name_escaped = self.escape_dataset_name(dataset_name)
1368  castor_path = os.path.join(castor_path, dataset_name_escaped)
1369 
1370  ###
1371 
1372  castor_path = os.path.normpath(castor_path)
1373 
1374  # End of create_castor_path_name_common.
1375  return castor_path
1376 
1377  ##########
1378 
1380  dataset_name, run_number,
1381  castor_path_common):
1382  """Create the specialised part of the CASTOR output dir name.
1383 
1384  NOTE: To avoid clashes with `incremental harvesting'
1385  (re-harvesting when a dataset grows) we have to include the
1386  event count in the path name. The underlying `problem' is that
1387  CRAB does not overwrite existing output files so if the output
1388  file already exists CRAB will fail to copy back the output.
1389 
1390  NOTE: It's not possible to create different kinds of
1391  harvesting jobs in a single call to this tool. However, in
1392  principle it could be possible to create both data and MC jobs
1393  in a single go.
1394 
1395  NOTE: The number of events used in the path name is the
1396  _total_ number of events in the dataset/run at the time of
1397  harvesting. If we're doing partial harvesting the final
1398  results will reflect lower statistics. This is a) the easiest
1399  to code and b) the least likely to lead to confusion if
1400  someone ever decides to swap/copy around file blocks between
1401  sites.
1402 
1403  """
1404 
1405  castor_path = castor_path_common
1406 
1407  ###
1408 
1409  # The run number part.
1410  castor_path = os.path.join(castor_path, "run_%d" % run_number)
1411 
1412  ###
1413 
1414  # The event count (i.e. the number of events we currently see
1415  # for this dataset).
1416  #nevents = self.datasets_information[dataset_name] \
1417  # ["num_events"][run_number]
1418  castor_path = os.path.join(castor_path, "nevents")
1419 
1420  ###
1421 
1422  castor_path = os.path.normpath(castor_path)
1423 
1424  # End of create_castor_path_name_special.
1425  return castor_path
1426 
1427  ##########
1428 
1430  """Make sure all required CASTOR output dirs exist.
1431 
1432  This checks the CASTOR base dir specified by the user as well
1433  as all the subdirs required by the current set of jobs.
1434 
1435  """
1436 
1437  self.logger.info("Checking (and if necessary creating) CASTOR " \
1438  "output area(s)...")
1439 
1440  # Call the real checker method for the base dir.
1441  self.create_and_check_castor_dir(self.castor_base_dir)
1442 
1443  # Now call the checker for all (unique) subdirs.
1444  castor_dirs = []
1445  for (dataset_name, runs) in self.datasets_to_use.iteritems():
1446 
1447  for run in runs:
1448  castor_dirs.append(self.datasets_information[dataset_name] \
1449  ["castor_path"][run])
1450  castor_dirs_unique = list(set(castor_dirs))
1451  castor_dirs_unique.sort()
1452  # This can take some time. E.g. CRAFT08 has > 300 runs, each
1453  # of which will get a new directory. So we show some (rough)
1454  # info in between.
1455  ndirs = len(castor_dirs_unique)
1456  step = max(ndirs / 10, 1)
1457  for (i, castor_dir) in enumerate(castor_dirs_unique):
1458  if (i + 1) % step == 0 or \
1459  (i + 1) == ndirs:
1460  self.logger.info(" %d/%d" % \
1461  (i + 1, ndirs))
1462  self.create_and_check_castor_dir(castor_dir)
1463 
1464  # Now check if the directory is empty. If (an old version
1465  # of) the output file already exists CRAB will run new
1466  # jobs but never copy the results back. We assume the user
1467  # knows what they are doing and only issue a warning in
1468  # case the directory is not empty.
1469  self.logger.debug("Checking if path `%s' is empty" % \
1470  castor_dir)
1471  cmd = "rfdir %s" % castor_dir
1472  (status, output) = commands.getstatusoutput(cmd)
1473  if status != 0:
1474  msg = "Could not access directory `%s'" \
1475  " !!! This is bad since I should have just" \
1476  " created it !!!" % castor_dir
1477  self.logger.fatal(msg)
1478  raise Error(msg)
1479  if len(output) > 0:
1480  self.logger.warning("Output directory `%s' is not empty:" \
1481  " new jobs will fail to" \
1482  " copy back output" % \
1483  castor_dir)
1484 
1485  # End of create_and_check_castor_dirs.
1486 
1487  ##########
1488 
1489  def create_and_check_castor_dir(self, castor_dir):
1490  """Check existence of the give CASTOR dir, if necessary create
1491  it.
1492 
1493  Some special care has to be taken with several things like
1494  setting the correct permissions such that CRAB can store the
1495  output results. Of course this means that things like
1496  /castor/cern.ch/ and user/j/ have to be recognised and treated
1497  properly.
1498 
1499  NOTE: Only CERN CASTOR area (/castor/cern.ch/) supported for
1500  the moment.
1501 
1502  NOTE: This method uses some slightly tricky caching to make
1503  sure we don't keep over and over checking the same base paths.
1504 
1505  """
1506 
1507  ###
1508 
1509  # Local helper function to fully split a path into pieces.
1510  def split_completely(path):
1511  (parent_path, name) = os.path.split(path)
1512  if name == "":
1513  return (parent_path, )
1514  else:
1515  return split_completely(parent_path) + (name, )
1516 
1517  ###
1518 
1519  # Local helper function to check rfio (i.e. CASTOR)
1520  # directories.
1521  def extract_permissions(rfstat_output):
1522  """Parse the output from rfstat and return the
1523  5-digit permissions string."""
1524 
1525  permissions_line = [i for i in output.split("\n") \
1526  if i.lower().find("protection") > -1]
1527  regexp = re.compile(".*\(([0123456789]{5})\).*")
1528  match = regexp.search(rfstat_output)
1529  if not match or len(match.groups()) != 1:
1530  msg = "Could not extract permissions " \
1531  "from output: %s" % rfstat_output
1532  self.logger.fatal(msg)
1533  raise Error(msg)
1534  permissions = match.group(1)
1535 
1536  # End of extract_permissions.
1537  return permissions
1538 
1539  ###
1540 
1541  # These are the pieces of CASTOR directories that we do not
1542  # want to touch when modifying permissions.
1543 
1544  # NOTE: This is all a bit involved, basically driven by the
1545  # fact that one wants to treat the `j' directory of
1546  # `/castor/cern.ch/user/j/jhegeman/' specially.
1547  # BUG BUG BUG
1548  # This should be simplified, for example by comparing to the
1549  # CASTOR prefix or something like that.
1550  # BUG BUG BUG end
1551  castor_paths_dont_touch = {
1552  0: ["/", "castor", "cern.ch", "cms", "store", "temp",
1553  "dqm", "offline", "user"],
1554  -1: ["user", "store"]
1555  }
1556 
1557  self.logger.debug("Checking CASTOR path `%s'" % castor_dir)
1558 
1559  ###
1560 
1561  # First we take the full CASTOR path apart.
1562  castor_path_pieces = split_completely(castor_dir)
1563 
1564  # Now slowly rebuild the CASTOR path and see if a) all
1565  # permissions are set correctly and b) the final destination
1566  # exists.
1567  path = ""
1568  check_sizes = castor_paths_dont_touch.keys()
1569  check_sizes.sort()
1570  len_castor_path_pieces = len(castor_path_pieces)
1571  for piece_index in xrange (len_castor_path_pieces):
1572  skip_this_path_piece = False
1573  piece = castor_path_pieces[piece_index]
1574 ## self.logger.debug("Checking CASTOR path piece `%s'" % \
1575 ## piece)
1576  for check_size in check_sizes:
1577  # Do we need to do anything with this?
1578  if (piece_index + check_size) > -1:
1579 ## self.logger.debug("Checking `%s' against `%s'" % \
1580 ## (castor_path_pieces[piece_index + check_size],
1581 ## castor_paths_dont_touch[check_size]))
1582  if castor_path_pieces[piece_index + check_size] in castor_paths_dont_touch[check_size]:
1583 ## self.logger.debug(" skipping")
1584  skip_this_path_piece = True
1585 ## else:
1586 ## # Piece not in the list, fine.
1587 ## self.logger.debug(" accepting")
1588  # Add piece to the path we're building.
1589 ## self.logger.debug("!!! Skip path piece `%s'? %s" % \
1590 ## (piece, str(skip_this_path_piece)))
1591 ## self.logger.debug("Adding piece to path...")
1592  path = os.path.join(path, piece)
1593 ## self.logger.debug("Path is now `%s'" % \
1594 ## path)
1595 
1596  # Hmmmm, only at this point can we do some caching. Not
1597  # ideal, but okay.
1598  try:
1599  if path in self.castor_path_checks_cache:
1600  continue
1601  except AttributeError:
1602  # This only happens the first time around.
1603  self.castor_path_checks_cache = []
1604  self.castor_path_checks_cache.append(path)
1605 
1606  # Now, unless we're supposed to skip this piece of the
1607  # path, let's make sure it exists and set the permissions
1608  # correctly for use by CRAB. This means that:
1609  # - the final output directory should (at least) have
1610  # permissions 775
1611  # - all directories above that should (at least) have
1612  # permissions 755.
1613 
1614  # BUT: Even though the above permissions are the usual
1615  # ones to used when setting up CASTOR areas for grid job
1616  # output, there is one caveat in case multiple people are
1617  # working in the same CASTOR area. If user X creates
1618  # /a/b/c/ and user Y wants to create /a/b/d/ he/she does
1619  # not have sufficient rights. So: we set all dir
1620  # permissions to 775 to avoid this.
1621 
1622  if not skip_this_path_piece:
1623 
1624  # Ok, first thing: let's make sure this directory
1625  # exists.
1626  # NOTE: The nice complication is of course that the
1627  # usual os.path.isdir() etc. methods don't work for an
1628  # rfio filesystem. So we call rfstat and interpret an
1629  # error as meaning that the path does not exist.
1630  self.logger.debug("Checking if path `%s' exists" % \
1631  path)
1632  cmd = "rfstat %s" % path
1633  (status, output) = commands.getstatusoutput(cmd)
1634  if status != 0:
1635  # Path does not exist, let's try and create it.
1636  self.logger.debug("Creating path `%s'" % path)
1637  cmd = "nsmkdir -m 775 %s" % path
1638  (status, output) = commands.getstatusoutput(cmd)
1639  if status != 0:
1640  msg = "Could not create directory `%s'" % path
1641  self.logger.fatal(msg)
1642  raise Error(msg)
1643  cmd = "rfstat %s" % path
1644  (status, output) = commands.getstatusoutput(cmd)
1645  # Now check that it looks like a directory. If I'm not
1646  # mistaken one can deduce this from the fact that the
1647  # (octal) permissions string starts with `40' (instead
1648  # of `100').
1649  permissions = extract_permissions(output)
1650  if not permissions.startswith("40"):
1651  msg = "Path `%s' is not a directory(?)" % path
1652  self.logger.fatal(msg)
1653  raise Error(msg)
1654 
1655  # Figure out the current permissions for this
1656  # (partial) path.
1657  self.logger.debug("Checking permissions for path `%s'" % path)
1658  cmd = "rfstat %s" % path
1659  (status, output) = commands.getstatusoutput(cmd)
1660  if status != 0:
1661  msg = "Could not obtain permissions for directory `%s'" % \
1662  path
1663  self.logger.fatal(msg)
1664  raise Error(msg)
1665  # Take the last three digits of the permissions.
1666  permissions = extract_permissions(output)[-3:]
1667 
1668  # Now if necessary fix permissions.
1669  # NOTE: Be careful never to `downgrade' permissions.
1670  if piece_index == (len_castor_path_pieces - 1):
1671  # This means we're looking at the final
1672  # destination directory.
1673  permissions_target = "775"
1674  else:
1675  # `Only' an intermediate directory.
1676  permissions_target = "775"
1677 
1678  # Compare permissions.
1679  permissions_new = []
1680  for (i, j) in zip(permissions, permissions_target):
1681  permissions_new.append(str(max(int(i), int(j))))
1682  permissions_new = "".join(permissions_new)
1683  self.logger.debug(" current permissions: %s" % \
1684  permissions)
1685  self.logger.debug(" target permissions : %s" % \
1686  permissions_target)
1687  if permissions_new != permissions:
1688  # We have to modify the permissions.
1689  self.logger.debug("Changing permissions of `%s' " \
1690  "to %s (were %s)" % \
1691  (path, permissions_new, permissions))
1692  cmd = "rfchmod %s %s" % (permissions_new, path)
1693  (status, output) = commands.getstatusoutput(cmd)
1694  if status != 0:
1695  msg = "Could not change permissions for path `%s' " \
1696  "to %s" % (path, permissions_new)
1697  self.logger.fatal(msg)
1698  raise Error(msg)
1699 
1700  self.logger.debug(" Permissions ok (%s)" % permissions_new)
1701 
1702  # End of create_and_check_castor_dir.
1703 
1704  ##########
1705 
1706  def pick_a_site(self, sites, cmssw_version):
1707 
1708  # Create list of forbidden sites
1709  sites_forbidden = []
1710 
1711  if (self.preferred_site == "CAF") or (self.preferred_site == "caf.cern.ch"):
1712  self.caf_access = True
1713 
1714  if self.caf_access == False:
1715  sites_forbidden.append("caf.cern.ch")
1716 
1717  # These are the T1 sites. These are only forbidden if we're
1718  # running in non-T1 mode.
1719  # Source:
1720  # https://cmsweb.cern.ch/sitedb/sitelist/?naming_scheme=ce
1721  # Hard-coded, yes. Not nice, no.
1722 
1723  all_t1 = [
1724  "srm-cms.cern.ch",
1725  "ccsrm.in2p3.fr",
1726  "cmssrm-fzk.gridka.de",
1727  "cmssrm.fnal.gov",
1728  "gridka-dCache.fzk.de",
1729  "srm-cms.gridpp.rl.ac.uk",
1730  "srm.grid.sinica.edu.tw",
1731  "srm2.grid.sinica.edu.tw",
1732  "srmcms.pic.es",
1733  "storm-fe-cms.cr.cnaf.infn.it"
1734  ]
1735 
1736  country_codes = {
1737  "CAF" : "caf.cern.ch",
1738  "CH" : "srm-cms.cern.ch",
1739  "FR" : "ccsrm.in2p3.fr",
1740  "DE" : "cmssrm-fzk.gridka.de",
1741  "GOV" : "cmssrm.fnal.gov",
1742  "DE2" : "gridka-dCache.fzk.de",
1743  "UK" : "srm-cms.gridpp.rl.ac.uk",
1744  "TW" : "srm.grid.sinica.edu.tw",
1745  "TW2" : "srm2.grid.sinica.edu.tw",
1746  "ES" : "srmcms.pic.es",
1747  "IT" : "storm-fe-cms.cr.cnaf.infn.it"
1748  }
1749 
1750  if self.non_t1access:
1751  sites_forbidden.extend(all_t1)
1752 
1753  for site in sites_forbidden:
1754  if site in sites:
1755  sites.remove(site)
1756 
1757  if self.preferred_site in country_codes:
1758  self.preferred_site = country_codes[self.preferred_site]
1759 
1760  if self.preferred_site != "no preference":
1761  if self.preferred_site in sites:
1762  sites = [self.preferred_site]
1763  else:
1764  sites= []
1765 
1766  #print sites
1767 
1768  # Looks like we have to do some caching here, otherwise things
1769  # become waaaay toooo sloooooow. So that's what the
1770  # sites_and_versions_cache does.
1771 
1772  # NOTE: Keep this set to None!
1773  site_name = None
1774  cmd = None
1775  while len(sites) > 0 and \
1776  site_name is None:
1777 
1778  # Create list of t1_sites
1779  t1_sites = []
1780  for site in sites:
1781  if site in all_t1:
1782  t1_sites.append(site)
1783  if site == "caf.cern.ch":
1784  t1_sites.append(site)
1785 
1786  # If avilable pick preferred site
1787  #if self.preferred_site in sites:
1788  # se_name = self.preferred_site
1789  # Else, if available pick t1 site
1790 
1791  if len(t1_sites) > 0:
1792  se_name = choice(t1_sites)
1793  # Else pick any site
1794  else:
1795  se_name = choice(sites)
1796 
1797  # But check that it hosts the CMSSW version we want.
1798 
1799  if se_name in self.sites_and_versions_cache and \
1800  cmssw_version in self.sites_and_versions_cache[se_name]:
1801  if self.sites_and_versions_cache[se_name][cmssw_version]:
1802  site_name = se_name
1803  break
1804  else:
1805  self.logger.debug(" --> rejecting site `%s'" % se_name)
1806  sites.remove(se_name)
1807 
1808  else:
1809  self.logger.info("Checking if site `%s' " \
1810  "has CMSSW version `%s'" % \
1811  (se_name, cmssw_version))
1812  self.sites_and_versions_cache[se_name] = {}
1813 
1814  # TODO TODO TODO
1815  # Test for SCRAM architecture removed as per request
1816  # from Andreas.
1817  # scram_arch = os.getenv("SCRAM_ARCH")
1818  # cmd = "lcg-info --list-ce " \
1819  # "--query '" \
1820  # "Tag=VO-cms-%s," \
1821  # "Tag=VO-cms-%s," \
1822  # "CEStatus=Production," \
1823  # "CloseSE=%s'" % \
1824  # (cmssw_version, scram_arch, se_name)
1825  # TODO TODO TODO end
1826 
1827  cmd = "lcg-info --list-ce " \
1828  "--query '" \
1829  "Tag=VO-cms-%s," \
1830  "CEStatus=Production," \
1831  "CloseSE=%s'" % \
1832  (cmssw_version, se_name)
1833  (status, output) = commands.getstatusoutput(cmd)
1834  if status != 0:
1835  self.logger.error("Could not check site information " \
1836  "for site `%s'" % se_name)
1837  else:
1838  if (len(output) > 0) or (se_name == "caf.cern.ch"):
1839  self.sites_and_versions_cache[se_name][cmssw_version] = True
1840  site_name = se_name
1841  break
1842  else:
1843  self.sites_and_versions_cache[se_name][cmssw_version] = False
1844  self.logger.debug(" --> rejecting site `%s'" % se_name)
1845  sites.remove(se_name)
1846 
1847  if site_name is self.no_matching_site_found_str:
1848  self.logger.error(" --> no matching site found")
1849  self.logger.error(" --> Your release or SCRAM " \
1850  "architecture may not be available" \
1851  "anywhere on the (LCG) grid.")
1852  if not cmd is None:
1853  self.logger.debug(" (command used: `%s')" % cmd)
1854  else:
1855  self.logger.debug(" --> selected site `%s'" % site_name)
1856 
1857  # Return something more descriptive (than `None') in case we
1858  # found nothing.
1859  if site_name is None:
1860  site_name = self.no_matching_site_found_str
1861  # Keep track of our global flag signifying that this
1862  # happened.
1863  self.all_sites_found = False
1864 
1865  # End of pick_a_site.
1866  return site_name
1867 
1868  ##########
1869 
1871 
1872  # Set up the command line parser. Note that we fix up the help
1873  # formatter so that we can add some text pointing people to
1874  # the Twiki etc.
1875  parser = optparse.OptionParser(version="%s %s" % \
1876  ("%prog", self.version),
1877  formatter=CMSHarvesterHelpFormatter())
1878 
1879  self.option_parser = parser
1880 
1881  # The debug switch.
1882  parser.add_option("-d", "--debug",
1883  help="Switch on debug mode",
1884  action="callback",
1885  callback=self.option_handler_debug)
1886 
1887  # The quiet switch.
1888  parser.add_option("-q", "--quiet",
1889  help="Be less verbose",
1890  action="callback",
1891  callback=self.option_handler_quiet)
1892 
1893  # The force switch. If this switch is used sanity checks are
1894  # performed but failures do not lead to aborts. Use with care.
1895  parser.add_option("", "--force",
1896  help="Force mode. Do not abort on sanity check "
1897  "failures",
1898  action="callback",
1899  callback=self.option_handler_force)
1900 
1901  # Choose between the different kinds of harvesting.
1902  parser.add_option("", "--harvesting_type",
1903  help="Harvesting type: %s" % \
1904  ", ".join(self.harvesting_types),
1905  action="callback",
1906  callback=self.option_handler_harvesting_type,
1907  type="string",
1908  metavar="HARVESTING_TYPE")
1909 
1910  # Choose between single-step and two-step mode.
1911  parser.add_option("", "--harvesting_mode",
1912  help="Harvesting mode: %s (default = %s)" % \
1913  (", ".join(self.harvesting_modes),
1914  self.harvesting_mode_default),
1915  action="callback",
1916  callback=self.option_handler_harvesting_mode,
1917  type="string",
1918  metavar="HARVESTING_MODE")
1919 
1920  # Override the GlobalTag chosen by the cmsHarvester.
1921  parser.add_option("", "--globaltag",
1922  help="GlobalTag to use. Default is the ones " \
1923  "the dataset was created with for MC, for data" \
1924  "a GlobalTag has to be specified.",
1925  action="callback",
1926  callback=self.option_handler_globaltag,
1927  type="string",
1928  metavar="GLOBALTAG")
1929 
1930  # Allow switching off of reference histograms.
1931  parser.add_option("", "--no-ref-hists",
1932  help="Don't use any reference histograms",
1933  action="callback",
1934  callback=self.option_handler_no_ref_hists)
1935 
1936  # Allow the default (i.e. the one that should be used)
1937  # Frontier connection to be overridden.
1938  parser.add_option("", "--frontier-connection",
1939  help="Use this Frontier connection to find " \
1940  "GlobalTags and LocalTags (for reference " \
1941  "histograms).\nPlease only use this for " \
1942  "testing.",
1943  action="callback",
1944  callback=self.option_handler_frontier_connection,
1945  type="string",
1946  metavar="FRONTIER")
1947 
1948  # Similar to the above but specific to the Frontier connection
1949  # to be used for the GlobalTag.
1950  parser.add_option("", "--frontier-connection-for-globaltag",
1951  help="Use this Frontier connection to find " \
1952  "GlobalTags.\nPlease only use this for " \
1953  "testing.",
1954  action="callback",
1955  callback=self.option_handler_frontier_connection,
1956  type="string",
1957  metavar="FRONTIER")
1958 
1959  # Similar to the above but specific to the Frontier connection
1960  # to be used for the reference histograms.
1961  parser.add_option("", "--frontier-connection-for-refhists",
1962  help="Use this Frontier connection to find " \
1963  "LocalTags (for reference " \
1964  "histograms).\nPlease only use this for " \
1965  "testing.",
1966  action="callback",
1967  callback=self.option_handler_frontier_connection,
1968  type="string",
1969  metavar="FRONTIER")
1970 
1971  # Option to specify the name (or a regexp) of the dataset(s)
1972  # to be used.
1973  parser.add_option("", "--dataset",
1974  help="Name (or regexp) of dataset(s) to process",
1975  action="callback",
1976  #callback=self.option_handler_dataset_name,
1977  callback=self.option_handler_input_spec,
1978  type="string",
1979  #dest="self.input_name",
1980  metavar="DATASET")
1981 
1982  # Option to specify the name (or a regexp) of the dataset(s)
1983  # to be ignored.
1984  parser.add_option("", "--dataset-ignore",
1985  help="Name (or regexp) of dataset(s) to ignore",
1986  action="callback",
1987  callback=self.option_handler_input_spec,
1988  type="string",
1989  metavar="DATASET-IGNORE")
1990 
1991  # Option to specify the name (or a regexp) of the run(s)
1992  # to be used.
1993  parser.add_option("", "--runs",
1994  help="Run number(s) to process",
1995  action="callback",
1996  callback=self.option_handler_input_spec,
1997  type="string",
1998  metavar="RUNS")
1999 
2000  # Option to specify the name (or a regexp) of the run(s)
2001  # to be ignored.
2002  parser.add_option("", "--runs-ignore",
2003  help="Run number(s) to ignore",
2004  action="callback",
2005  callback=self.option_handler_input_spec,
2006  type="string",
2007  metavar="RUNS-IGNORE")
2008 
2009  # Option to specify a file containing a list of dataset names
2010  # (or regexps) to be used.
2011  parser.add_option("", "--datasetfile",
2012  help="File containing list of dataset names " \
2013  "(or regexps) to process",
2014  action="callback",
2015  #callback=self.option_handler_listfile_name,
2016  callback=self.option_handler_input_spec,
2017  type="string",
2018  #dest="self.input_name",
2019  metavar="DATASETFILE")
2020 
2021  # Option to specify a file containing a list of dataset names
2022  # (or regexps) to be ignored.
2023  parser.add_option("", "--datasetfile-ignore",
2024  help="File containing list of dataset names " \
2025  "(or regexps) to ignore",
2026  action="callback",
2027  callback=self.option_handler_input_spec,
2028  type="string",
2029  metavar="DATASETFILE-IGNORE")
2030 
2031  # Option to specify a file containing a list of runs to be
2032  # used.
2033  parser.add_option("", "--runslistfile",
2034  help="File containing list of run numbers " \
2035  "to process",
2036  action="callback",
2037  callback=self.option_handler_input_spec,
2038  type="string",
2039  metavar="RUNSLISTFILE")
2040 
2041  # Option to specify a file containing a list of runs
2042  # to be ignored.
2043  parser.add_option("", "--runslistfile-ignore",
2044  help="File containing list of run numbers " \
2045  "to ignore",
2046  action="callback",
2047  callback=self.option_handler_input_spec,
2048  type="string",
2049  metavar="RUNSLISTFILE-IGNORE")
2050 
2051  # Option to specify a Jsonfile contaning a list of runs
2052  # to be used.
2053  parser.add_option("", "--Jsonrunfile",
2054  help="Jsonfile containing dictionary of run/lumisections pairs. " \
2055  "All lumisections of runs contained in dictionary are processed.",
2056  action="callback",
2057  callback=self.option_handler_input_Jsonrunfile,
2058  type="string",
2059  metavar="JSONRUNFILE")
2060 
2061  # Option to specify a Jsonfile contaning a dictionary of run/lumisections pairs
2062  # to be used.
2063  parser.add_option("", "--Jsonfile",
2064  help="Jsonfile containing dictionary of run/lumisections pairs. " \
2065  "Only specified lumisections of runs contained in dictionary are processed.",
2066  action="callback",
2067  callback=self.option_handler_input_Jsonfile,
2068  type="string",
2069  metavar="JSONFILE")
2070 
2071  # Option to specify a ToDo file contaning a list of runs
2072  # to be used.
2073  parser.add_option("", "--todo-file",
2074  help="Todo file containing a list of runs to process.",
2075  action="callback",
2076  callback=self.option_handler_input_todofile,
2077  type="string",
2078  metavar="TODO-FILE")
2079 
2080  # Option to specify which file to use for the dataset name to
2081  # reference histogram name mappings.
2082  parser.add_option("", "--refhistmappingfile",
2083  help="File to be use for the reference " \
2084  "histogram mappings. Default: `%s'." % \
2085  self.ref_hist_mappings_file_name_default,
2086  action="callback",
2087  callback=self.option_handler_ref_hist_mapping_file,
2088  type="string",
2089  metavar="REFHISTMAPPING-FILE")
2090 
2091  # Specify the place in CASTOR where the output should go.
2092  # NOTE: Only output to CASTOR is supported for the moment,
2093  # since the central DQM results place is on CASTOR anyway.
2094  parser.add_option("", "--castordir",
2095  help="Place on CASTOR to store results. " \
2096  "Default: `%s'." % \
2097  self.castor_base_dir_default,
2098  action="callback",
2099  callback=self.option_handler_castor_dir,
2100  type="string",
2101  metavar="CASTORDIR")
2102 
2103  # Use this to try and create jobs that will run without
2104  # special `t1access' role.
2105  parser.add_option("", "--no-t1access",
2106  help="Try to create jobs that will run " \
2107  "without special `t1access' role",
2108  action="callback",
2109  callback=self.option_handler_no_t1access)
2110 
2111  # Use this to create jobs that may run on CAF
2112  parser.add_option("", "--caf-access",
2113  help="Crab jobs may run " \
2114  "on CAF",
2115  action="callback",
2116  callback=self.option_handler_caf_access)
2117 
2118  # set process.dqmSaver.saveByLumiSection=1 in harvesting cfg file
2119  parser.add_option("", "--saveByLumiSection",
2120  help="set saveByLumiSection=1 in harvesting cfg file",
2121  action="callback",
2122  callback=self.option_handler_saveByLumiSection)
2123 
2124  # Use this to enable automatic creation and submission of crab jobs
2125  parser.add_option("", "--automatic-crab-submission",
2126  help="Crab jobs are created and " \
2127  "submitted automatically",
2128  action="callback",
2129  callback=self.option_handler_crab_submission)
2130 
2131  # Option to set the max number of sites, each
2132  #job is submitted to
2133  parser.add_option("", "--max-sites",
2134  help="Max. number of sites each job is submitted to",
2135  action="callback",
2136  callback=self.option_handler_sites,
2137  type="int")
2138 
2139  # Option to set the preferred site
2140  parser.add_option("", "--site",
2141  help="Crab jobs are submitted to specified site. T1 sites may be shortened by the following (country) codes: \
2142  srm-cms.cern.ch : CH \
2143  ccsrm.in2p3.fr : FR \
2144  cmssrm-fzk.gridka.de : DE \
2145  cmssrm.fnal.gov : GOV \
2146  gridka-dCache.fzk.de : DE2 \
2147  rm-cms.gridpp.rl.ac.uk : UK \
2148  srm.grid.sinica.edu.tw : TW \
2149  srm2.grid.sinica.edu.tw : TW2 \
2150  srmcms.pic.es : ES \
2151  storm-fe-cms.cr.cnaf.infn.it : IT",
2152  action="callback",
2153  callback=self.option_handler_preferred_site,
2154  type="string")
2155 
2156  # This is the command line flag to list all harvesting
2157  # type-to-sequence mappings.
2158  parser.add_option("-l", "--list",
2159  help="List all harvesting types and their" \
2160  "corresponding sequence names",
2161  action="callback",
2162  callback=self.option_handler_list_types)
2163 
2164  # If nothing was specified: tell the user how to do things the
2165  # next time and exit.
2166  # NOTE: We just use the OptParse standard way of doing this by
2167  # acting as if a '--help' was specified.
2168  if len(self.cmd_line_opts) < 1:
2169  self.cmd_line_opts = ["--help"]
2170 
2171  # Some trickery with the options. Why? Well, since these
2172  # options change the output level immediately from the option
2173  # handlers, the results differ depending on where they are on
2174  # the command line. Let's just make sure they are at the
2175  # front.
2176  # NOTE: Not very efficient or sophisticated, but it works and
2177  # it only has to be done once anyway.
2178  for i in ["-d", "--debug",
2179  "-q", "--quiet"]:
2180  if i in self.cmd_line_opts:
2181  self.cmd_line_opts.remove(i)
2182  self.cmd_line_opts.insert(0, i)
2183 
2184  # Everything is set up, now parse what we were given.
2185  parser.set_defaults()
2186  (self.options, self.args) = parser.parse_args(self.cmd_line_opts)
2187 
2188  # End of parse_cmd_line_options.
2189 
2190  ##########
2191 
2193  """Check completeness and correctness of input information.
2194 
2195  Check that all required information has been specified and
2196  that, at least as far as can be easily checked, it makes
2197  sense.
2198 
2199  NOTE: This is also where any default values are applied.
2200 
2201  """
2202 
2203  self.logger.info("Checking completeness/correctness of input...")
2204 
2205  # The cmsHarvester does not take (i.e. understand) any
2206  # arguments so there should not be any.
2207  if len(self.args) > 0:
2208  msg = "Sorry but I don't understand `%s'" % \
2209  (" ".join(self.args))
2210  self.logger.fatal(msg)
2211  raise Usage(msg)
2212 
2213  # BUG BUG BUG
2214  # While we wait for some bugs left and right to get fixed, we
2215  # disable two-step.
2216  if self.harvesting_mode == "two-step":
2217  msg = "--------------------\n" \
2218  " Sorry, but for the moment (well, till it works)" \
2219  " the two-step mode has been disabled.\n" \
2220  "--------------------\n"
2221  self.logger.fatal(msg)
2222  raise Error(msg)
2223  # BUG BUG BUG end
2224 
2225  # We need a harvesting method to be specified
2226  if self.harvesting_type is None:
2227  msg = "Please specify a harvesting type"
2228  self.logger.fatal(msg)
2229  raise Usage(msg)
2230  # as well as a harvesting mode.
2231  if self.harvesting_mode is None:
2232  self.harvesting_mode = self.harvesting_mode_default
2233  msg = "No harvesting mode specified --> using default `%s'" % \
2234  self.harvesting_mode
2235  self.logger.warning(msg)
2236  #raise Usage(msg)
2237 
2238  ###
2239 
2240  # We need an input method so we can find the dataset name(s).
2241  if self.input_method["datasets"]["use"] is None:
2242  msg = "Please specify an input dataset name " \
2243  "or a list file name"
2244  self.logger.fatal(msg)
2245  raise Usage(msg)
2246 
2247  # DEBUG DEBUG DEBUG
2248  # If we get here, we should also have an input name.
2249  assert not self.input_name["datasets"]["use"] is None
2250  # DEBUG DEBUG DEBUG end
2251 
2252  ###
2253 
2254  # The same holds for the reference histogram mapping file (if
2255  # we're using references).
2256  if self.use_ref_hists:
2257  if self.ref_hist_mappings_file_name is None:
2258  self.ref_hist_mappings_file_name = self.ref_hist_mappings_file_name_default
2259  msg = "No reference histogram mapping file specified --> " \
2260  "using default `%s'" % \
2261  self.ref_hist_mappings_file_name
2262  self.logger.warning(msg)
2263 
2264  ###
2265 
2266  # We need to know where to put the stuff (okay, the results)
2267  # on CASTOR.
2268  if self.castor_base_dir is None:
2269  self.castor_base_dir = self.castor_base_dir_default
2270  msg = "No CASTOR area specified -> using default `%s'" % \
2271  self.castor_base_dir
2272  self.logger.warning(msg)
2273  #raise Usage(msg)
2274 
2275  # Only the CERN CASTOR area is supported.
2276  if not self.castor_base_dir.startswith(self.castor_prefix):
2277  msg = "CASTOR area does not start with `%s'" % \
2278  self.castor_prefix
2279  self.logger.fatal(msg)
2280  if self.castor_base_dir.find("castor") > -1 and \
2281  not self.castor_base_dir.find("cern.ch") > -1:
2282  self.logger.fatal("Only CERN CASTOR is supported")
2283  raise Usage(msg)
2284 
2285  ###
2286 
2287  # TODO TODO TODO
2288  # This should be removed in the future, once I find out how to
2289  # get the config file used to create a given dataset from DBS.
2290 
2291  # For data we need to have a GlobalTag. (For MC we can figure
2292  # it out by ourselves.)
2293  if self.globaltag is None:
2294  self.logger.warning("No GlobalTag specified. This means I cannot")
2295  self.logger.warning("run on data, only on MC.")
2296  self.logger.warning("I will skip all data datasets.")
2297 
2298  # TODO TODO TODO end
2299 
2300  # Make sure the GlobalTag ends with `::All'.
2301  if not self.globaltag is None:
2302  if not self.globaltag.endswith("::All"):
2303  self.logger.warning("Specified GlobalTag `%s' does " \
2304  "not end in `::All' --> " \
2305  "appending this missing piece" % \
2306  self.globaltag)
2307  self.globaltag = "%s::All" % self.globaltag
2308 
2309  ###
2310 
2311  # Dump some info about the Frontier connections used.
2312  for (key, value) in self.frontier_connection_name.iteritems():
2313  frontier_type_str = "unknown"
2314  if key == "globaltag":
2315  frontier_type_str = "the GlobalTag"
2316  elif key == "refhists":
2317  frontier_type_str = "the reference histograms"
2318  non_str = None
2319  if self.frontier_connection_overridden[key] == True:
2320  non_str = "non-"
2321  else:
2322  non_str = ""
2323  self.logger.info("Using %sdefault Frontier " \
2324  "connection for %s: `%s'" % \
2325  (non_str, frontier_type_str, value))
2326 
2327  ###
2328 
2329  # End of check_input_status.
2330 
2331  ##########
2332 
2333  def check_cmssw(self):
2334  """Check if CMSSW is setup.
2335 
2336  """
2337 
2338  # Try to access the CMSSW_VERSION environment variable. If
2339  # it's something useful we consider CMSSW to be set up
2340  # properly. Otherwise we raise an error.
2341  cmssw_version = os.getenv("CMSSW_VERSION")
2342  if cmssw_version is None:
2343  self.logger.fatal("It seems CMSSW is not setup...")
2344  self.logger.fatal("($CMSSW_VERSION is empty)")
2345  raise Error("ERROR: CMSSW needs to be setup first!")
2346 
2347  self.cmssw_version = cmssw_version
2348  self.logger.info("Found CMSSW version %s properly set up" % \
2349  self.cmssw_version)
2350 
2351  # End of check_cmsssw.
2352  return True
2353 
2354  ##########
2355 
2356  def check_dbs(self):
2357  """Check if DBS is setup.
2358 
2359  """
2360 
2361  # Try to access the DBSCMD_HOME environment variable. If this
2362  # looks useful we consider DBS to be set up
2363  # properly. Otherwise we raise an error.
2364  dbs_home = os.getenv("DBSCMD_HOME")
2365  if dbs_home is None:
2366  self.logger.fatal("It seems DBS is not setup...")
2367  self.logger.fatal(" $DBSCMD_HOME is empty")
2368  raise Error("ERROR: DBS needs to be setup first!")
2369 
2370 ## # Now we try to do a very simple DBS search. If that works
2371 ## # instead of giving us the `Unsupported API call' crap, we
2372 ## # should be good to go.
2373 ## # NOTE: Not ideal, I know, but it reduces the amount of
2374 ## # complaints I get...
2375 ## cmd = "dbs search --query=\"find dataset where dataset = impossible\""
2376 ## (status, output) = commands.getstatusoutput(cmd)
2377 ## pdb.set_trace()
2378 ## if status != 0 or \
2379 ## output.lower().find("unsupported api call") > -1:
2380 ## self.logger.fatal("It seems DBS is not setup...")
2381 ## self.logger.fatal(" %s returns crap:" % cmd)
2382 ## for line in output.split("\n"):
2383 ## self.logger.fatal(" %s" % line)
2384 ## raise Error("ERROR: DBS needs to be setup first!")
2385 
2386  self.logger.debug("Found DBS properly set up")
2387 
2388  # End of check_dbs.
2389  return True
2390 
2391  ##########
2392 
2393  def setup_dbs(self):
2394  """Setup the Python side of DBS.
2395 
2396  For more information see the DBS Python API documentation:
2397  https://twiki.cern.ch/twiki/bin/view/CMS/DBSApiDocumentation
2398 
2399  """
2400 
2401  try:
2402  args={}
2403  args["url"]= "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/" \
2404  "servlet/DBSServlet"
2405  api = DbsApi(args)
2406  self.dbs_api = api
2407 
2408  except DBSAPI.dbsApiException.DbsApiException as ex:
2409  self.logger.fatal("Caught DBS API exception %s: %s " % \
2410  (ex.getClassName(), ex.getErrorMessage()))
2411  if ex.getErrorCode() not in (None, ""):
2412  logger.debug("DBS exception error code: ", ex.getErrorCode())
2413  raise
2414 
2415  # End of setup_dbs.
2416 
2417  ##########
2418 
2419  def dbs_resolve_dataset_name(self, dataset_name):
2420  """Use DBS to resolve a wildcarded dataset name.
2421 
2422  """
2423 
2424  # DEBUG DEBUG DEBUG
2425  # If we get here DBS should have been set up already.
2426  assert not self.dbs_api is None
2427  # DEBUG DEBUG DEBUG end
2428 
2429  # Some minor checking to make sure that whatever we've been
2430  # given as dataset name actually sounds like a dataset name.
2431  if not (dataset_name.startswith("/") and \
2432  dataset_name.endswith("RECO")):
2433  self.logger.warning("Dataset name `%s' does not sound " \
2434  "like a valid dataset name!" % \
2435  dataset_name)
2436 
2437  #----------
2438 
2439  api = self.dbs_api
2440  dbs_query = "find dataset where dataset like %s " \
2441  "and dataset.status = VALID" % \
2442  dataset_name
2443  try:
2444  api_result = api.executeQuery(dbs_query)
2445  except DBSAPI.dbsApiException.DbsApiException:
2446  msg = "ERROR: Could not execute DBS query"
2447  self.logger.fatal(msg)
2448  raise Error(msg)
2449 
2450  # Setup parsing.
2451  handler = DBSXMLHandler(["dataset"])
2452  parser = xml.sax.make_parser()
2453  parser.setContentHandler(handler)
2454 
2455  # Parse.
2456  try:
2457  xml.sax.parseString(api_result, handler)
2458  except SAXParseException:
2459  msg = "ERROR: Could not parse DBS server output"
2460  self.logger.fatal(msg)
2461  raise Error(msg)
2462 
2463  # DEBUG DEBUG DEBUG
2464  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2465  # DEBUG DEBUG DEBUG end
2466 
2467  # Extract the results.
2468  datasets = handler.results.values()[0]
2469 
2470  # End of dbs_resolve_dataset_name.
2471  return datasets
2472 
2473  ##########
2474 
2475  def dbs_resolve_cmssw_version(self, dataset_name):
2476  """Ask DBS for the CMSSW version used to create this dataset.
2477 
2478  """
2479 
2480  # DEBUG DEBUG DEBUG
2481  # If we get here DBS should have been set up already.
2482  assert not self.dbs_api is None
2483  # DEBUG DEBUG DEBUG end
2484 
2485  api = self.dbs_api
2486  dbs_query = "find algo.version where dataset = %s " \
2487  "and dataset.status = VALID" % \
2488  dataset_name
2489  try:
2490  api_result = api.executeQuery(dbs_query)
2491  except DBSAPI.dbsApiException.DbsApiException:
2492  msg = "ERROR: Could not execute DBS query"
2493  self.logger.fatal(msg)
2494  raise Error(msg)
2495 
2496  handler = DBSXMLHandler(["algo.version"])
2497  parser = xml.sax.make_parser()
2498  parser.setContentHandler(handler)
2499 
2500  try:
2501  xml.sax.parseString(api_result, handler)
2502  except SAXParseException:
2503  msg = "ERROR: Could not parse DBS server output"
2504  self.logger.fatal(msg)
2505  raise Error(msg)
2506 
2507  # DEBUG DEBUG DEBUG
2508  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2509  # DEBUG DEBUG DEBUG end
2510 
2511  cmssw_version = handler.results.values()[0]
2512 
2513  # DEBUG DEBUG DEBUG
2514  assert len(cmssw_version) == 1
2515  # DEBUG DEBUG DEBUG end
2516 
2517  cmssw_version = cmssw_version[0]
2518 
2519  # End of dbs_resolve_cmssw_version.
2520  return cmssw_version
2521 
2522  ##########
2523 
2524 ## def dbs_resolve_dataset_number_of_events(self, dataset_name):
2525 ## """Ask DBS across how many events this dataset has been spread
2526 ## out.
2527 
2528 ## This is especially useful to check that we do not submit a job
2529 ## supposed to run on a complete sample that is not contained at
2530 ## a single site.
2531 
2532 ## """
2533 
2534 ## # DEBUG DEBUG DEBUG
2535 ## # If we get here DBS should have been set up already.
2536 ## assert not self.dbs_api is None
2537 ## # DEBUG DEBUG DEBUG end
2538 
2539 ## api = self.dbs_api
2540 ## dbs_query = "find count(site) where dataset = %s " \
2541 ## "and dataset.status = VALID" % \
2542 ## dataset_name
2543 ## try:
2544 ## api_result = api.executeQuery(dbs_query)
2545 ## except DbsApiException:
2546 ## raise Error("ERROR: Could not execute DBS query")
2547 
2548 ## try:
2549 ## num_events = []
2550 ## class Handler(xml.sax.handler.ContentHandler):
2551 ## def startElement(self, name, attrs):
2552 ## if name == "result":
2553 ## num_events.append(str(attrs["COUNT_STORAGEELEMENT"]))
2554 ## xml.sax.parseString(api_result, Handler())
2555 ## except SAXParseException:
2556 ## raise Error("ERROR: Could not parse DBS server output")
2557 
2558 ## # DEBUG DEBUG DEBUG
2559 ## assert len(num_events) == 1
2560 ## # DEBUG DEBUG DEBUG end
2561 
2562 ## num_events = int(num_events[0])
2563 
2564 ## # End of dbs_resolve_dataset_number_of_events.
2565 ## return num_events
2566 
2567  ##########
2568 
2569  def dbs_resolve_runs(self, dataset_name):
2570  """Ask DBS for the list of runs in a given dataset.
2571 
2572  # NOTE: This does not (yet?) skip/remove empty runs. There is
2573  # a bug in the DBS entry run.numevents (i.e. it always returns
2574  # zero) which should be fixed in the `next DBS release'.
2575  # See also:
2576  # https://savannah.cern.ch/bugs/?53452
2577  # https://savannah.cern.ch/bugs/?53711
2578 
2579  """
2580 
2581  # TODO TODO TODO
2582  # We should remove empty runs as soon as the above mentioned
2583  # bug is fixed.
2584  # TODO TODO TODO end
2585 
2586  # DEBUG DEBUG DEBUG
2587  # If we get here DBS should have been set up already.
2588  assert not self.dbs_api is None
2589  # DEBUG DEBUG DEBUG end
2590 
2591  api = self.dbs_api
2592  dbs_query = "find run where dataset = %s " \
2593  "and dataset.status = VALID" % \
2594  dataset_name
2595  try:
2596  api_result = api.executeQuery(dbs_query)
2597  except DBSAPI.dbsApiException.DbsApiException:
2598  msg = "ERROR: Could not execute DBS query"
2599  self.logger.fatal(msg)
2600  raise Error(msg)
2601 
2602  handler = DBSXMLHandler(["run"])
2603  parser = xml.sax.make_parser()
2604  parser.setContentHandler(handler)
2605 
2606  try:
2607  xml.sax.parseString(api_result, handler)
2608  except SAXParseException:
2609  msg = "ERROR: Could not parse DBS server output"
2610  self.logger.fatal(msg)
2611  raise Error(msg)
2612 
2613  # DEBUG DEBUG DEBUG
2614  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2615  # DEBUG DEBUG DEBUG end
2616 
2617  runs = handler.results.values()[0]
2618  # Turn strings into integers.
2619  runs = [int(i) for i in runs]
2620  runs.sort()
2621 
2622  # End of dbs_resolve_runs.
2623  return runs
2624 
2625  ##########
2626 
2627  def dbs_resolve_globaltag(self, dataset_name):
2628  """Ask DBS for the globaltag corresponding to a given dataset.
2629 
2630  # BUG BUG BUG
2631  # This does not seem to work for data datasets? E.g. for
2632  # /Cosmics/Commissioning08_CRAFT0831X_V1_311_ReReco_FromSuperPointing_v1/RAW-RECO
2633  # Probaly due to the fact that the GlobalTag changed during
2634  # datataking...
2635  BUG BUG BUG end
2636 
2637  """
2638 
2639  # DEBUG DEBUG DEBUG
2640  # If we get here DBS should have been set up already.
2641  assert not self.dbs_api is None
2642  # DEBUG DEBUG DEBUG end
2643 
2644  api = self.dbs_api
2645  dbs_query = "find dataset.tag where dataset = %s " \
2646  "and dataset.status = VALID" % \
2647  dataset_name
2648  try:
2649  api_result = api.executeQuery(dbs_query)
2650  except DBSAPI.dbsApiException.DbsApiException:
2651  msg = "ERROR: Could not execute DBS query"
2652  self.logger.fatal(msg)
2653  raise Error(msg)
2654 
2655  handler = DBSXMLHandler(["dataset.tag"])
2656  parser = xml.sax.make_parser()
2657  parser.setContentHandler(parser)
2658 
2659  try:
2660  xml.sax.parseString(api_result, handler)
2661  except SAXParseException:
2662  msg = "ERROR: Could not parse DBS server output"
2663  self.logger.fatal(msg)
2664  raise Error(msg)
2665 
2666  # DEBUG DEBUG DEBUG
2667  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2668  # DEBUG DEBUG DEBUG end
2669 
2670  globaltag = handler.results.values()[0]
2671 
2672  # DEBUG DEBUG DEBUG
2673  assert len(globaltag) == 1
2674  # DEBUG DEBUG DEBUG end
2675 
2676  globaltag = globaltag[0]
2677 
2678  # End of dbs_resolve_globaltag.
2679  return globaltag
2680 
2681  ##########
2682 
2683  def dbs_resolve_datatype(self, dataset_name):
2684  """Ask DBS for the the data type (data or mc) of a given
2685  dataset.
2686 
2687  """
2688 
2689  # DEBUG DEBUG DEBUG
2690  # If we get here DBS should have been set up already.
2691  assert not self.dbs_api is None
2692  # DEBUG DEBUG DEBUG end
2693 
2694  api = self.dbs_api
2695  dbs_query = "find datatype.type where dataset = %s " \
2696  "and dataset.status = VALID" % \
2697  dataset_name
2698  try:
2699  api_result = api.executeQuery(dbs_query)
2700  except DBSAPI.dbsApiException.DbsApiException:
2701  msg = "ERROR: Could not execute DBS query"
2702  self.logger.fatal(msg)
2703  raise Error(msg)
2704 
2705  handler = DBSXMLHandler(["datatype.type"])
2706  parser = xml.sax.make_parser()
2707  parser.setContentHandler(handler)
2708 
2709  try:
2710  xml.sax.parseString(api_result, handler)
2711  except SAXParseException:
2712  msg = "ERROR: Could not parse DBS server output"
2713  self.logger.fatal(msg)
2714  raise Error(msg)
2715 
2716  # DEBUG DEBUG DEBUG
2717  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2718  # DEBUG DEBUG DEBUG end
2719 
2720  datatype = handler.results.values()[0]
2721 
2722  # DEBUG DEBUG DEBUG
2723  assert len(datatype) == 1
2724  # DEBUG DEBUG DEBUG end
2725 
2726  datatype = datatype[0]
2727 
2728  # End of dbs_resolve_datatype.
2729  return datatype
2730 
2731  ##########
2732 
2733  # OBSOLETE OBSOLETE OBSOLETE
2734  # This method is no longer used.
2735 
2736  def dbs_resolve_number_of_events(self, dataset_name, run_number=None):
2737  """Determine the number of events in a given dataset (and run).
2738 
2739  Ask DBS for the number of events in a dataset. If a run number
2740  is specified the number of events returned is that in that run
2741  of that dataset. If problems occur we throw an exception.
2742 
2743  # BUG BUG BUG
2744  # Since DBS does not return the number of events correctly,
2745  # neither for runs nor for whole datasets, we have to work
2746  # around that a bit...
2747  # BUG BUG BUG end
2748 
2749  """
2750 
2751  # DEBUG DEBUG DEBUG
2752  # If we get here DBS should have been set up already.
2753  assert not self.dbs_api is None
2754  # DEBUG DEBUG DEBUG end
2755 
2756  api = self.dbs_api
2757  dbs_query = "find file.name, file.numevents where dataset = %s " \
2758  "and dataset.status = VALID" % \
2759  dataset_name
2760  if not run_number is None:
2761  dbs_query = dbq_query + (" and run = %d" % run_number)
2762  try:
2763  api_result = api.executeQuery(dbs_query)
2764  except DBSAPI.dbsApiException.DbsApiException:
2765  msg = "ERROR: Could not execute DBS query"
2766  self.logger.fatal(msg)
2767  raise Error(msg)
2768 
2769  handler = DBSXMLHandler(["file.name", "file.numevents"])
2770  parser = xml.sax.make_parser()
2771  parser.setContentHandler(handler)
2772 
2773  try:
2774  xml.sax.parseString(api_result, handler)
2775  except SAXParseException:
2776  msg = "ERROR: Could not parse DBS server output"
2777  self.logger.fatal(msg)
2778  raise Error(msg)
2779 
2780  # DEBUG DEBUG DEBUG
2781  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2782  # DEBUG DEBUG DEBUG end
2783 
2784  num_events = sum(handler.results["file.numevents"])
2785 
2786  # End of dbs_resolve_number_of_events.
2787  return num_events
2788 
2789  # OBSOLETE OBSOLETE OBSOLETE end
2790 
2791  ##########
2792 
2793 ## def dbs_resolve_dataset_number_of_sites(self, dataset_name):
2794 ## """Ask DBS across how many sites this dataset has been spread
2795 ## out.
2796 
2797 ## This is especially useful to check that we do not submit a job
2798 ## supposed to run on a complete sample that is not contained at
2799 ## a single site.
2800 
2801 ## """
2802 
2803 ## # DEBUG DEBUG DEBUG
2804 ## # If we get here DBS should have been set up already.
2805 ## assert not self.dbs_api is None
2806 ## # DEBUG DEBUG DEBUG end
2807 
2808 ## api = self.dbs_api
2809 ## dbs_query = "find count(site) where dataset = %s " \
2810 ## "and dataset.status = VALID" % \
2811 ## dataset_name
2812 ## try:
2813 ## api_result = api.executeQuery(dbs_query)
2814 ## except DbsApiException:
2815 ## raise Error("ERROR: Could not execute DBS query")
2816 
2817 ## try:
2818 ## num_sites = []
2819 ## class Handler(xml.sax.handler.ContentHandler):
2820 ## def startElement(self, name, attrs):
2821 ## if name == "result":
2822 ## num_sites.append(str(attrs["COUNT_STORAGEELEMENT"]))
2823 ## xml.sax.parseString(api_result, Handler())
2824 ## except SAXParseException:
2825 ## raise Error("ERROR: Could not parse DBS server output")
2826 
2827 ## # DEBUG DEBUG DEBUG
2828 ## assert len(num_sites) == 1
2829 ## # DEBUG DEBUG DEBUG end
2830 
2831 ## num_sites = int(num_sites[0])
2832 
2833 ## # End of dbs_resolve_dataset_number_of_sites.
2834 ## return num_sites
2835 
2836  ##########
2837 
2838 ## def dbs_check_dataset_spread(self, dataset_name):
2839 ## """Figure out across how many sites this dataset is spread.
2840 
2841 ## NOTE: This is something we need to figure out per run, since
2842 ## we want to submit harvesting jobs per run.
2843 
2844 ## Basically three things can happen with a given dataset:
2845 ## - the whole dataset is available on a single site,
2846 ## - the whole dataset is available (mirrored) at multiple sites,
2847 ## - the dataset is spread across multiple sites and there is no
2848 ## single site containing the full dataset in one place.
2849 
2850 ## NOTE: If all goes well, it should not be possible that
2851 ## anything but a _full_ dataset is mirrored. So we ignore the
2852 ## possibility in which for example one site contains the full
2853 ## dataset and two others mirror half of it.
2854 ## ANOTHER NOTE: According to some people this last case _could_
2855 ## actually happen. I will not design for it, but make sure it
2856 ## ends up as a false negative, in which case we just loose some
2857 ## efficiency and treat the dataset (unnecessarily) as
2858 ## spread-out.
2859 
2860 ## We don't really care about the first two possibilities, but in
2861 ## the third case we need to make sure to run the harvesting in
2862 ## two-step mode.
2863 
2864 ## This method checks with DBS which of the above cases is true
2865 ## for the dataset name given, and returns a 1 for the first two
2866 ## cases, and the number of sites across which the dataset is
2867 ## spread for the third case.
2868 
2869 ## The way in which this is done is by asking how many files each
2870 ## site has for the dataset. In the first case there is only one
2871 ## site, in the second case all sites should have the same number
2872 ## of files (i.e. the total number of files in the dataset) and
2873 ## in the third case the file counts from all sites should add up
2874 ## to the total file count for the dataset.
2875 
2876 ## """
2877 
2878 ## # DEBUG DEBUG DEBUG
2879 ## # If we get here DBS should have been set up already.
2880 ## assert not self.dbs_api is None
2881 ## # DEBUG DEBUG DEBUG end
2882 
2883 ## api = self.dbs_api
2884 ## dbs_query = "find run, run.numevents, site, file.count " \
2885 ## "where dataset = %s " \
2886 ## "and dataset.status = VALID" % \
2887 ## dataset_name
2888 ## try:
2889 ## api_result = api.executeQuery(dbs_query)
2890 ## except DbsApiException:
2891 ## msg = "ERROR: Could not execute DBS query"
2892 ## self.logger.fatal(msg)
2893 ## raise Error(msg)
2894 
2895 ## # Index things by run number. No cross-check is done to make
2896 ## # sure we get results for each and every run in the
2897 ## # dataset. I'm not sure this would make sense since we'd be
2898 ## # cross-checking DBS info with DBS info anyway. Note that we
2899 ## # use the file count per site to see if we're dealing with an
2900 ## # incomplete vs. a mirrored dataset.
2901 ## sample_info = {}
2902 ## try:
2903 ## class Handler(xml.sax.handler.ContentHandler):
2904 ## def startElement(self, name, attrs):
2905 ## if name == "result":
2906 ## run_number = int(attrs["RUNS_RUNNUMBER"])
2907 ## site_name = str(attrs["STORAGEELEMENT_SENAME"])
2908 ## file_count = int(attrs["COUNT_FILES"])
2909 ## # BUG BUG BUG
2910 ## # Doh! For some reason DBS never returns any other
2911 ## # event count than zero.
2912 ## event_count = int(attrs["RUNS_NUMBEROFEVENTS"])
2913 ## # BUG BUG BUG end
2914 ## info = (site_name, file_count, event_count)
2915 ## try:
2916 ## sample_info[run_number].append(info)
2917 ## except KeyError:
2918 ## sample_info[run_number] = [info]
2919 ## xml.sax.parseString(api_result, Handler())
2920 ## except SAXParseException:
2921 ## msg = "ERROR: Could not parse DBS server output"
2922 ## self.logger.fatal(msg)
2923 ## raise Error(msg)
2924 
2925 ## # Now translate this into a slightly more usable mapping.
2926 ## sites = {}
2927 ## for (run_number, site_info) in sample_info.iteritems():
2928 ## # Quick-n-dirty trick to see if all file counts are the
2929 ## # same.
2930 ## unique_file_counts = set([i[1] for i in site_info])
2931 ## if len(unique_file_counts) == 1:
2932 ## # Okay, so this must be a mirrored dataset.
2933 ## # We have to pick one but we have to be careful. We
2934 ## # cannot submit to things like a T0, a T1, or CAF.
2935 ## site_names = [self.pick_a_site([i[0] for i in site_info])]
2936 ## nevents = [site_info[0][2]]
2937 ## else:
2938 ## # Looks like this is a spread-out sample.
2939 ## site_names = [i[0] for i in site_info]
2940 ## nevents = [i[2] for i in site_info]
2941 ## sites[run_number] = zip(site_names, nevents)
2942 
2943 ## self.logger.debug("Sample `%s' spread is:" % dataset_name)
2944 ## run_numbers = sites.keys()
2945 ## run_numbers.sort()
2946 ## for run_number in run_numbers:
2947 ## self.logger.debug(" run # %6d: %d sites (%s)" % \
2948 ## (run_number,
2949 ## len(sites[run_number]),
2950 ## ", ".join([i[0] for i in sites[run_number]])))
2951 
2952 ## # End of dbs_check_dataset_spread.
2953 ## return sites
2954 
2955 ## # DEBUG DEBUG DEBUG
2956 ## # Just kept for debugging now.
2957 ## def dbs_check_dataset_spread_old(self, dataset_name):
2958 ## """Figure out across how many sites this dataset is spread.
2959 
2960 ## NOTE: This is something we need to figure out per run, since
2961 ## we want to submit harvesting jobs per run.
2962 
2963 ## Basically three things can happen with a given dataset:
2964 ## - the whole dataset is available on a single site,
2965 ## - the whole dataset is available (mirrored) at multiple sites,
2966 ## - the dataset is spread across multiple sites and there is no
2967 ## single site containing the full dataset in one place.
2968 
2969 ## NOTE: If all goes well, it should not be possible that
2970 ## anything but a _full_ dataset is mirrored. So we ignore the
2971 ## possibility in which for example one site contains the full
2972 ## dataset and two others mirror half of it.
2973 ## ANOTHER NOTE: According to some people this last case _could_
2974 ## actually happen. I will not design for it, but make sure it
2975 ## ends up as a false negative, in which case we just loose some
2976 ## efficiency and treat the dataset (unnecessarily) as
2977 ## spread-out.
2978 
2979 ## We don't really care about the first two possibilities, but in
2980 ## the third case we need to make sure to run the harvesting in
2981 ## two-step mode.
2982 
2983 ## This method checks with DBS which of the above cases is true
2984 ## for the dataset name given, and returns a 1 for the first two
2985 ## cases, and the number of sites across which the dataset is
2986 ## spread for the third case.
2987 
2988 ## The way in which this is done is by asking how many files each
2989 ## site has for the dataset. In the first case there is only one
2990 ## site, in the second case all sites should have the same number
2991 ## of files (i.e. the total number of files in the dataset) and
2992 ## in the third case the file counts from all sites should add up
2993 ## to the total file count for the dataset.
2994 
2995 ## """
2996 
2997 ## # DEBUG DEBUG DEBUG
2998 ## # If we get here DBS should have been set up already.
2999 ## assert not self.dbs_api is None
3000 ## # DEBUG DEBUG DEBUG end
3001 
3002 ## api = self.dbs_api
3003 ## dbs_query = "find run, run.numevents, site, file.count " \
3004 ## "where dataset = %s " \
3005 ## "and dataset.status = VALID" % \
3006 ## dataset_name
3007 ## try:
3008 ## api_result = api.executeQuery(dbs_query)
3009 ## except DbsApiException:
3010 ## msg = "ERROR: Could not execute DBS query"
3011 ## self.logger.fatal(msg)
3012 ## raise Error(msg)
3013 
3014 ## # Index things by run number. No cross-check is done to make
3015 ## # sure we get results for each and every run in the
3016 ## # dataset. I'm not sure this would make sense since we'd be
3017 ## # cross-checking DBS info with DBS info anyway. Note that we
3018 ## # use the file count per site to see if we're dealing with an
3019 ## # incomplete vs. a mirrored dataset.
3020 ## sample_info = {}
3021 ## try:
3022 ## class Handler(xml.sax.handler.ContentHandler):
3023 ## def startElement(self, name, attrs):
3024 ## if name == "result":
3025 ## run_number = int(attrs["RUNS_RUNNUMBER"])
3026 ## site_name = str(attrs["STORAGEELEMENT_SENAME"])
3027 ## file_count = int(attrs["COUNT_FILES"])
3028 ## # BUG BUG BUG
3029 ## # Doh! For some reason DBS never returns any other
3030 ## # event count than zero.
3031 ## event_count = int(attrs["RUNS_NUMBEROFEVENTS"])
3032 ## # BUG BUG BUG end
3033 ## info = (site_name, file_count, event_count)
3034 ## try:
3035 ## sample_info[run_number].append(info)
3036 ## except KeyError:
3037 ## sample_info[run_number] = [info]
3038 ## xml.sax.parseString(api_result, Handler())
3039 ## except SAXParseException:
3040 ## msg = "ERROR: Could not parse DBS server output"
3041 ## self.logger.fatal(msg)
3042 ## raise Error(msg)
3043 
3044 ## # Now translate this into a slightly more usable mapping.
3045 ## sites = {}
3046 ## for (run_number, site_info) in sample_info.iteritems():
3047 ## # Quick-n-dirty trick to see if all file counts are the
3048 ## # same.
3049 ## unique_file_counts = set([i[1] for i in site_info])
3050 ## if len(unique_file_counts) == 1:
3051 ## # Okay, so this must be a mirrored dataset.
3052 ## # We have to pick one but we have to be careful. We
3053 ## # cannot submit to things like a T0, a T1, or CAF.
3054 ## site_names = [self.pick_a_site([i[0] for i in site_info])]
3055 ## nevents = [site_info[0][2]]
3056 ## else:
3057 ## # Looks like this is a spread-out sample.
3058 ## site_names = [i[0] for i in site_info]
3059 ## nevents = [i[2] for i in site_info]
3060 ## sites[run_number] = zip(site_names, nevents)
3061 
3062 ## self.logger.debug("Sample `%s' spread is:" % dataset_name)
3063 ## run_numbers = sites.keys()
3064 ## run_numbers.sort()
3065 ## for run_number in run_numbers:
3066 ## self.logger.debug(" run # %6d: %d site(s) (%s)" % \
3067 ## (run_number,
3068 ## len(sites[run_number]),
3069 ## ", ".join([i[0] for i in sites[run_number]])))
3070 
3071 ## # End of dbs_check_dataset_spread_old.
3072 ## return sites
3073 ## # DEBUG DEBUG DEBUG end
3074 
3075  ##########
3076 
3077  def dbs_check_dataset_spread(self, dataset_name):
3078  """Figure out the number of events in each run of this dataset.
3079 
3080  This is a more efficient way of doing this than calling
3081  dbs_resolve_number_of_events for each run.
3082 
3083  """
3084 
3085  self.logger.debug("Checking spread of dataset `%s'" % dataset_name)
3086 
3087  # DEBUG DEBUG DEBUG
3088  # If we get here DBS should have been set up already.
3089  assert not self.dbs_api is None
3090  # DEBUG DEBUG DEBUG end
3091 
3092  api = self.dbs_api
3093  dbs_query = "find run.number, site, file.name, file.numevents " \
3094  "where dataset = %s " \
3095  "and dataset.status = VALID" % \
3096  dataset_name
3097  try:
3098  api_result = api.executeQuery(dbs_query)
3099  except DBSAPI.dbsApiException.DbsApiException:
3100  msg = "ERROR: Could not execute DBS query"
3101  self.logger.fatal(msg)
3102  raise Error(msg)
3103 
3104  handler = DBSXMLHandler(["run.number", "site", "file.name", "file.numevents"])
3105  parser = xml.sax.make_parser()
3106  parser.setContentHandler(handler)
3107 
3108  try:
3109  # OBSOLETE OBSOLETE OBSOLETE
3110 ## class Handler(xml.sax.handler.ContentHandler):
3111 ## def startElement(self, name, attrs):
3112 ## if name == "result":
3113 ## site_name = str(attrs["STORAGEELEMENT_SENAME"])
3114 ## # TODO TODO TODO
3115 ## # Ugly hack to get around cases like this:
3116 ## # $ dbs search --query="find dataset, site, file.count where dataset=/RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO"
3117 ## # Using DBS instance at: http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet
3118 ## # Processing ... \
3119 ## # PATH STORAGEELEMENT_SENAME COUNT_FILES
3120 ## # _________________________________________________________________________________
3121 ## # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO 1
3122 ## # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO cmssrm.fnal.gov 12
3123 ## # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO srm-cms.cern.ch 12
3124 ## if len(site_name) < 1:
3125 ## return
3126 ## # TODO TODO TODO end
3127 ## run_number = int(attrs["RUNS_RUNNUMBER"])
3128 ## file_name = str(attrs["FILES_LOGICALFILENAME"])
3129 ## nevents = int(attrs["FILES_NUMBEROFEVENTS"])
3130 ## # I know, this is a bit of a kludge.
3131 ## if not files_info.has_key(run_number):
3132 ## # New run.
3133 ## files_info[run_number] = {}
3134 ## files_info[run_number][file_name] = (nevents,
3135 ## [site_name])
3136 ## elif not files_info[run_number].has_key(file_name):
3137 ## # New file for a known run.
3138 ## files_info[run_number][file_name] = (nevents,
3139 ## [site_name])
3140 ## else:
3141 ## # New entry for a known file for a known run.
3142 ## # DEBUG DEBUG DEBUG
3143 ## # Each file should have the same number of
3144 ## # events independent of the site it's at.
3145 ## assert nevents == files_info[run_number][file_name][0]
3146 ## # DEBUG DEBUG DEBUG end
3147 ## files_info[run_number][file_name][1].append(site_name)
3148  # OBSOLETE OBSOLETE OBSOLETE end
3149  xml.sax.parseString(api_result, handler)
3150  except SAXParseException:
3151  msg = "ERROR: Could not parse DBS server output"
3152  self.logger.fatal(msg)
3153  raise Error(msg)
3154 
3155  # DEBUG DEBUG DEBUG
3156  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
3157  # DEBUG DEBUG DEBUG end
3158 
3159  # Now reshuffle all results a bit so we can more easily use
3160  # them later on. (Remember that all arrays in the results
3161  # should have equal length.)
3162  files_info = {}
3163  for (index, site_name) in enumerate(handler.results["site"]):
3164  # Ugly hack to get around cases like this:
3165  # $ dbs search --query="find dataset, site, file.count where dataset=/RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO"
3166  # Using DBS instance at: http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet
3167  # Processing ... \
3168  # PATH STORAGEELEMENT_SENAME COUNT_FILES
3169  # _________________________________________________________________________________
3170  # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO 1
3171  # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO cmssrm.fnal.gov 12
3172  # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO srm-cms.cern.ch 12
3173  if len(site_name) < 1:
3174  continue
3175  run_number = int(handler.results["run.number"][index])
3176  file_name = handler.results["file.name"][index]
3177  nevents = int(handler.results["file.numevents"][index])
3178 
3179  # I know, this is a bit of a kludge.
3180  if run_number not in files_info:
3181  # New run.
3182  files_info[run_number] = {}
3183  files_info[run_number][file_name] = (nevents,
3184  [site_name])
3185  elif file_name not in files_info[run_number]:
3186  # New file for a known run.
3187  files_info[run_number][file_name] = (nevents,
3188  [site_name])
3189  else:
3190  # New entry for a known file for a known run.
3191  # DEBUG DEBUG DEBUG
3192  # Each file should have the same number of
3193  # events independent of the site it's at.
3194  assert nevents == files_info[run_number][file_name][0]
3195  # DEBUG DEBUG DEBUG end
3196  files_info[run_number][file_name][1].append(site_name)
3197 
3198  # Remove any information for files that are not available
3199  # anywhere. NOTE: After introducing the ugly hack above, this
3200  # is a bit redundant, but let's keep it for the moment.
3201  for run_number in files_info.keys():
3202  files_without_sites = [i for (i, j) in \
3203  files_info[run_number].items() \
3204  if len(j[1]) < 1]
3205  if len(files_without_sites) > 0:
3206  self.logger.warning("Removing %d file(s)" \
3207  " with empty site names" % \
3208  len(files_without_sites))
3209  for file_name in files_without_sites:
3210  del files_info[run_number][file_name]
3211  # files_info[run_number][file_name] = (files_info \
3212  # [run_number] \
3213  # [file_name][0], [])
3214 
3215  # And another bit of a kludge.
3216  num_events_catalog = {}
3217  for run_number in files_info.keys():
3218  site_names = list(set([j for i in files_info[run_number].values() for j in i[1]]))
3219 
3220  # NOTE: The term `mirrored' does not have the usual
3221  # meaning here. It basically means that we can apply
3222  # single-step harvesting.
3223  mirrored = None
3224  if len(site_names) > 1:
3225  # Now we somehow need to figure out if we're dealing
3226  # with a mirrored or a spread-out dataset. The rule we
3227  # use here is that we're dealing with a spread-out
3228  # dataset unless we can find at least one site
3229  # containing exactly the full list of files for this
3230  # dataset that DBS knows about. In that case we just
3231  # use only that site.
3232  all_file_names = files_info[run_number].keys()
3233  all_file_names = set(all_file_names)
3234  sites_with_complete_copies = []
3235  for site_name in site_names:
3236  files_at_site = [i for (i, (j, k)) \
3237  in files_info[run_number].items() \
3238  if site_name in k]
3239  files_at_site = set(files_at_site)
3240  if files_at_site == all_file_names:
3241  sites_with_complete_copies.append(site_name)
3242  if len(sites_with_complete_copies) < 1:
3243  # This dataset/run is available at more than one
3244  # site, but no one has a complete copy. So this is
3245  # a spread-out sample.
3246  mirrored = False
3247  else:
3248  if len(sites_with_complete_copies) > 1:
3249  # This sample is available (and complete) at
3250  # more than one site. Definitely mirrored.
3251  mirrored = True
3252  else:
3253  # This dataset/run is available at more than
3254  # one site and at least one of them has a
3255  # complete copy. Even if this is only a single
3256  # site, let's call this `mirrored' and run the
3257  # single-step harvesting.
3258  mirrored = True
3259 
3260 ## site_names_ref = set(files_info[run_number].values()[0][1])
3261 ## for site_names_tmp in files_info[run_number].values()[1:]:
3262 ## if set(site_names_tmp[1]) != site_names_ref:
3263 ## mirrored = False
3264 ## break
3265 
3266  if mirrored:
3267  self.logger.debug(" -> run appears to be `mirrored'")
3268  else:
3269  self.logger.debug(" -> run appears to be spread-out")
3270 
3271  if mirrored and \
3272  len(sites_with_complete_copies) != len(site_names):
3273  # Remove any references to incomplete sites if we
3274  # have at least one complete site (and if there
3275  # are incomplete sites).
3276  for (file_name, (i, sites)) in files_info[run_number].items():
3277  complete_sites = [site for site in sites \
3278  if site in sites_with_complete_copies]
3279  files_info[run_number][file_name] = (i, complete_sites)
3280  site_names = sites_with_complete_copies
3281 
3282  self.logger.debug(" for run #%d:" % run_number)
3283  num_events_catalog[run_number] = {}
3284  num_events_catalog[run_number]["all_sites"] = sum([i[0] for i in files_info[run_number].values()])
3285  if len(site_names) < 1:
3286  self.logger.debug(" run is not available at any site")
3287  self.logger.debug(" (but should contain %d events" % \
3288  num_events_catalog[run_number]["all_sites"])
3289  else:
3290  self.logger.debug(" at all sites combined there are %d events" % \
3291  num_events_catalog[run_number]["all_sites"])
3292  for site_name in site_names:
3293  num_events_catalog[run_number][site_name] = sum([i[0] for i in files_info[run_number].values() if site_name in i[1]])
3294  self.logger.debug(" at site `%s' there are %d events" % \
3295  (site_name, num_events_catalog[run_number][site_name]))
3296  num_events_catalog[run_number]["mirrored"] = mirrored
3297 
3298  # End of dbs_check_dataset_spread.
3299  return num_events_catalog
3300 
3301  # Beginning of old version.
3302 ## def dbs_check_dataset_num_events(self, dataset_name):
3303 ## """Figure out the number of events in each run of this dataset.
3304 
3305 ## This is a more efficient way of doing this than calling
3306 ## dbs_resolve_number_of_events for each run.
3307 
3308 ## # BUG BUG BUG
3309 ## # This might very well not work at all for spread-out samples. (?)
3310 ## # BUG BUG BUG end
3311 
3312 ## """
3313 
3314 ## # DEBUG DEBUG DEBUG
3315 ## # If we get here DBS should have been set up already.
3316 ## assert not self.dbs_api is None
3317 ## # DEBUG DEBUG DEBUG end
3318 
3319 ## api = self.dbs_api
3320 ## dbs_query = "find run.number, file.name, file.numevents where dataset = %s " \
3321 ## "and dataset.status = VALID" % \
3322 ## dataset_name
3323 ## try:
3324 ## api_result = api.executeQuery(dbs_query)
3325 ## except DbsApiException:
3326 ## msg = "ERROR: Could not execute DBS query"
3327 ## self.logger.fatal(msg)
3328 ## raise Error(msg)
3329 
3330 ## try:
3331 ## files_info = {}
3332 ## class Handler(xml.sax.handler.ContentHandler):
3333 ## def startElement(self, name, attrs):
3334 ## if name == "result":
3335 ## run_number = int(attrs["RUNS_RUNNUMBER"])
3336 ## file_name = str(attrs["FILES_LOGICALFILENAME"])
3337 ## nevents = int(attrs["FILES_NUMBEROFEVENTS"])
3338 ## try:
3339 ## files_info[run_number][file_name] = nevents
3340 ## except KeyError:
3341 ## files_info[run_number] = {file_name: nevents}
3342 ## xml.sax.parseString(api_result, Handler())
3343 ## except SAXParseException:
3344 ## msg = "ERROR: Could not parse DBS server output"
3345 ## self.logger.fatal(msg)
3346 ## raise Error(msg)
3347 
3348 ## num_events_catalog = {}
3349 ## for run_number in files_info.keys():
3350 ## num_events_catalog[run_number] = sum(files_info[run_number].values())
3351 
3352 ## # End of dbs_check_dataset_num_events.
3353 ## return num_events_catalog
3354  # End of old version.
3355 
3356  ##########
3357 
3358  def build_dataset_list(self, input_method, input_name):
3359  """Build a list of all datasets to be processed.
3360 
3361  """
3362 
3363  dataset_names = []
3364 
3365  # It may be, but only for the list of datasets to ignore, that
3366  # the input method and name are None because nothing was
3367  # specified. In that case just an empty list is returned.
3368  if input_method is None:
3369  pass
3370  elif input_method == "dataset":
3371  # Input comes from a dataset name directly on the command
3372  # line. But, this can also contain wildcards so we need
3373  # DBS to translate it conclusively into a list of explicit
3374  # dataset names.
3375  self.logger.info("Asking DBS for dataset names")
3376  dataset_names = self.dbs_resolve_dataset_name(input_name)
3377  elif input_method == "datasetfile":
3378  # In this case a file containing a list of dataset names
3379  # is specified. Still, each line may contain wildcards so
3380  # this step also needs help from DBS.
3381  # NOTE: Lines starting with a `#' are ignored.
3382  self.logger.info("Reading input from list file `%s'" % \
3383  input_name)
3384  try:
3385  listfile = open("/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/bin/%s" %input_name, "r")
3386  print "open listfile"
3387  for dataset in listfile:
3388  # Skip empty lines.
3389  dataset_stripped = dataset.strip()
3390  if len(dataset_stripped) < 1:
3391  continue
3392  # Skip lines starting with a `#'.
3393  if dataset_stripped[0] != "#":
3394  dataset_names.extend(self. \
3395  dbs_resolve_dataset_name(dataset_stripped))
3396  listfile.close()
3397  except IOError:
3398  msg = "ERROR: Could not open input list file `%s'" % \
3399  input_name
3400  self.logger.fatal(msg)
3401  raise Error(msg)
3402  else:
3403  # DEBUG DEBUG DEBUG
3404  # We should never get here.
3405  assert False, "Unknown input method `%s'" % input_method
3406  # DEBUG DEBUG DEBUG end
3407 
3408  # Remove duplicates from the dataset list.
3409  # NOTE: There should not be any duplicates in any list coming
3410  # from DBS, but maybe the user provided a list file with less
3411  # care.
3412  dataset_names = list(set(dataset_names))
3413 
3414  # Store for later use.
3415  dataset_names.sort()
3416 
3417  # End of build_dataset_list.
3418  return dataset_names
3419 
3420  ##########
3421 
3423  """Build a list of datasets to process.
3424 
3425  """
3426 
3427  self.logger.info("Building list of datasets to consider...")
3428 
3429  input_method = self.input_method["datasets"]["use"]
3430  input_name = self.input_name["datasets"]["use"]
3431  dataset_names = self.build_dataset_list(input_method,
3432  input_name)
3433  self.datasets_to_use = dict(list(zip(dataset_names,
3434  [None] * len(dataset_names))))
3435 
3436  self.logger.info(" found %d dataset(s) to process:" % \
3437  len(dataset_names))
3438  for dataset in dataset_names:
3439  self.logger.info(" `%s'" % dataset)
3440 
3441  # End of build_dataset_use_list.
3442 
3443  ##########
3444 
3446  """Build a list of datasets to ignore.
3447 
3448  NOTE: We should always have a list of datasets to process, but
3449  it may be that we don't have a list of datasets to ignore.
3450 
3451  """
3452 
3453  self.logger.info("Building list of datasets to ignore...")
3454 
3455  input_method = self.input_method["datasets"]["ignore"]
3456  input_name = self.input_name["datasets"]["ignore"]
3457  dataset_names = self.build_dataset_list(input_method,
3458  input_name)
3459  self.datasets_to_ignore = dict(list(zip(dataset_names,
3460  [None] * len(dataset_names))))
3461 
3462  self.logger.info(" found %d dataset(s) to ignore:" % \
3463  len(dataset_names))
3464  for dataset in dataset_names:
3465  self.logger.info(" `%s'" % dataset)
3466 
3467  # End of build_dataset_ignore_list.
3468 
3469  ##########
3470 
3471  def build_runs_list(self, input_method, input_name):
3472 
3473  runs = []
3474 
3475  # A list of runs (either to use or to ignore) is not
3476  # required. This protects against `empty cases.'
3477  if input_method is None:
3478  pass
3479  elif input_method == "runs":
3480  # A list of runs was specified directly from the command
3481  # line.
3482  self.logger.info("Reading list of runs from the " \
3483  "command line")
3484  runs.extend([int(i.strip()) \
3485  for i in input_name.split(",") \
3486  if len(i.strip()) > 0])
3487  elif input_method == "runslistfile":
3488  # We were passed a file containing a list of runs.
3489  self.logger.info("Reading list of runs from file `%s'" % \
3490  input_name)
3491  try:
3492  listfile = open(input_name, "r")
3493  for run in listfile:
3494  # Skip empty lines.
3495  run_stripped = run.strip()
3496  if len(run_stripped) < 1:
3497  continue
3498  # Skip lines starting with a `#'.
3499  if run_stripped[0] != "#":
3500  runs.append(int(run_stripped))
3501  listfile.close()
3502  except IOError:
3503  msg = "ERROR: Could not open input list file `%s'" % \
3504  input_name
3505  self.logger.fatal(msg)
3506  raise Error(msg)
3507 
3508  else:
3509  # DEBUG DEBUG DEBUG
3510  # We should never get here.
3511  assert False, "Unknown input method `%s'" % input_method
3512  # DEBUG DEBUG DEBUG end
3513 
3514  # Remove duplicates, sort and done.
3515  runs = list(set(runs))
3516 
3517  # End of build_runs_list().
3518  return runs
3519 
3520  ##########
3521 
3523  """Build a list of runs to process.
3524 
3525  """
3526 
3527  self.logger.info("Building list of runs to consider...")
3528 
3529  input_method = self.input_method["runs"]["use"]
3530  input_name = self.input_name["runs"]["use"]
3531  runs = self.build_runs_list(input_method, input_name)
3532  self.runs_to_use = dict(list(zip(runs, [None] * len(runs))))
3533 
3534  self.logger.info(" found %d run(s) to process:" % \
3535  len(runs))
3536  if len(runs) > 0:
3537  self.logger.info(" %s" % ", ".join([str(i) for i in runs]))
3538 
3539  # End of build_runs_list().
3540 
3541  ##########
3542 
3544  """Build a list of runs to ignore.
3545 
3546  NOTE: We should always have a list of runs to process, but
3547  it may be that we don't have a list of runs to ignore.
3548 
3549  """
3550 
3551  self.logger.info("Building list of runs to ignore...")
3552 
3553  input_method = self.input_method["runs"]["ignore"]
3554  input_name = self.input_name["runs"]["ignore"]
3555  runs = self.build_runs_list(input_method, input_name)
3556  self.runs_to_ignore = dict(list(zip(runs, [None] * len(runs))))
3557 
3558  self.logger.info(" found %d run(s) to ignore:" % \
3559  len(runs))
3560  if len(runs) > 0:
3561  self.logger.info(" %s" % ", ".join([str(i) for i in runs]))
3562 
3563  # End of build_runs_ignore_list().
3564 
3565  ##########
3566 
3568  """Update the list of datasets taking into account the ones to
3569  ignore.
3570 
3571  Both lists have been generated before from DBS and both are
3572  assumed to be unique.
3573 
3574  NOTE: The advantage of creating the ignore list from DBS (in
3575  case a regexp is given) and matching that instead of directly
3576  matching the ignore criterion against the list of datasets (to
3577  consider) built from DBS is that in the former case we're sure
3578  that all regexps are treated exactly as DBS would have done
3579  without the cmsHarvester.
3580 
3581  NOTE: This only removes complete samples. Exclusion of single
3582  runs is done by the book keeping. So the assumption is that a
3583  user never wants to harvest just part (i.e. n out of N runs)
3584  of a sample.
3585 
3586  """
3587 
3588  self.logger.info("Processing list of datasets to ignore...")
3589 
3590  self.logger.debug("Before processing ignore list there are %d " \
3591  "datasets in the list to be processed" % \
3592  len(self.datasets_to_use))
3593 
3594  # Simple approach: just loop and search.
3595  dataset_names_filtered = copy.deepcopy(self.datasets_to_use)
3596  for dataset_name in self.datasets_to_use.keys():
3597  if dataset_name in self.datasets_to_ignore.keys():
3598  del dataset_names_filtered[dataset_name]
3599 
3600  self.logger.info(" --> Removed %d dataset(s)" % \
3601  (len(self.datasets_to_use) -
3602  len(dataset_names_filtered)))
3603 
3604  self.datasets_to_use = dataset_names_filtered
3605 
3606  self.logger.debug("After processing ignore list there are %d " \
3607  "datasets in the list to be processed" % \
3608  len(self.datasets_to_use))
3609 
3610  # End of process_dataset_ignore_list.
3611 
3612  ##########
3613 
3615 
3616  self.logger.info("Processing list of runs to use and ignore...")
3617 
3618  # This basically adds all runs in a dataset to be processed,
3619  # except for any runs that are not specified in the `to use'
3620  # list and any runs that are specified in the `to ignore'
3621  # list.
3622 
3623  # NOTE: It is assumed that those lists make sense. The input
3624  # should be checked against e.g. overlapping `use' and
3625  # `ignore' lists.
3626 
3627  runs_to_use = self.runs_to_use
3628  runs_to_ignore = self.runs_to_ignore
3629 
3630  for dataset_name in self.datasets_to_use:
3631  runs_in_dataset = self.datasets_information[dataset_name]["runs"]
3632 
3633  # First some sanity checks.
3634  runs_to_use_tmp = []
3635  for run in runs_to_use:
3636  if not run in runs_in_dataset:
3637  self.logger.warning("Dataset `%s' does not contain " \
3638  "requested run %d " \
3639  "--> ignoring `use' of this run" % \
3640  (dataset_name, run))
3641  else:
3642  runs_to_use_tmp.append(run)
3643 
3644  if len(runs_to_use) > 0:
3645  runs = runs_to_use_tmp
3646  self.logger.info("Using %d out of %d runs " \
3647  "of dataset `%s'" % \
3648  (len(runs), len(runs_in_dataset),
3649  dataset_name))
3650  else:
3651  runs = runs_in_dataset
3652 
3653  if len(runs_to_ignore) > 0:
3654  runs_tmp = []
3655  for run in runs:
3656  if not run in runs_to_ignore:
3657  runs_tmp.append(run)
3658  self.logger.info("Ignoring %d out of %d runs " \
3659  "of dataset `%s'" % \
3660  (len(runs)- len(runs_tmp),
3661  len(runs_in_dataset),
3662  dataset_name))
3663  runs = runs_tmp
3664 
3665  if self.todofile != "YourToDofile.txt":
3666  runs_todo = []
3667  print "Reading runs from file /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s" %self.todofile
3668  cmd="grep %s /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s | cut -f5 -d' '" %(dataset_name,self.todofile)
3669  (status, output)=commands.getstatusoutput(cmd)
3670  for run in runs:
3671  run_str="%s" %run
3672  if run_str in output:
3673  runs_todo.append(run)
3674  self.logger.info("Using %d runs " \
3675  "of dataset `%s'" % \
3676  (len(runs_todo),
3677  dataset_name))
3678  runs=runs_todo
3679 
3680  Json_runs = []
3681  if self.Jsonfilename != "YourJSON.txt":
3682  good_runs = []
3683  self.Jsonlumi = True
3684  # We were passed a Jsonfile containing a dictionary of
3685  # run/lunisection-pairs
3686  self.logger.info("Reading runs and lumisections from file `%s'" % \
3687  self.Jsonfilename)
3688  try:
3689  Jsonfile = open(self.Jsonfilename, "r")
3690  for names in Jsonfile:
3691  dictNames= eval(str(names))
3692  for key in dictNames:
3693  intkey=int(key)
3694  Json_runs.append(intkey)
3695  Jsonfile.close()
3696  except IOError:
3697  msg = "ERROR: Could not open Jsonfile `%s'" % \
3698  input_name
3699  self.logger.fatal(msg)
3700  raise Error(msg)
3701  for run in runs:
3702  if run in Json_runs:
3703  good_runs.append(run)
3704  self.logger.info("Using %d runs " \
3705  "of dataset `%s'" % \
3706  (len(good_runs),
3707  dataset_name))
3708  runs=good_runs
3709  if (self.Jsonrunfilename != "YourJSON.txt") and (self.Jsonfilename == "YourJSON.txt"):
3710  good_runs = []
3711  # We were passed a Jsonfile containing a dictionary of
3712  # run/lunisection-pairs
3713  self.logger.info("Reading runs from file `%s'" % \
3714  self.Jsonrunfilename)
3715  try:
3716  Jsonfile = open(self.Jsonrunfilename, "r")
3717  for names in Jsonfile:
3718  dictNames= eval(str(names))
3719  for key in dictNames:
3720  intkey=int(key)
3721  Json_runs.append(intkey)
3722  Jsonfile.close()
3723  except IOError:
3724  msg = "ERROR: Could not open Jsonfile `%s'" % \
3725  input_name
3726  self.logger.fatal(msg)
3727  raise Error(msg)
3728  for run in runs:
3729  if run in Json_runs:
3730  good_runs.append(run)
3731  self.logger.info("Using %d runs " \
3732  "of dataset `%s'" % \
3733  (len(good_runs),
3734  dataset_name))
3735  runs=good_runs
3736 
3737  self.datasets_to_use[dataset_name] = runs
3738 
3739  # End of process_runs_use_and_ignore_lists().
3740 
3741  ##########
3742 
3744  """Remove all but the largest part of all datasets.
3745 
3746  This allows us to harvest at least part of these datasets
3747  using single-step harvesting until the two-step approach
3748  works.
3749 
3750  """
3751 
3752  # DEBUG DEBUG DEBUG
3753  assert self.harvesting_mode == "single-step-allow-partial"
3754  # DEBUG DEBUG DEBUG end
3755 
3756  for dataset_name in self.datasets_to_use:
3757  for run_number in self.datasets_information[dataset_name]["runs"]:
3758  max_events = max(self.datasets_information[dataset_name]["sites"][run_number].values())
3759  sites_with_max_events = [i[0] for i in self.datasets_information[dataset_name]["sites"][run_number].items() if i[1] == max_events]
3760  self.logger.warning("Singlifying dataset `%s', " \
3761  "run %d" % \
3762  (dataset_name, run_number))
3763  cmssw_version = self.datasets_information[dataset_name] \
3764  ["cmssw_version"]
3765  selected_site = self.pick_a_site(sites_with_max_events,
3766  cmssw_version)
3767 
3768  # Let's tell the user that we're manhandling this dataset.
3769  nevents_old = self.datasets_information[dataset_name]["num_events"][run_number]
3770  self.logger.warning(" --> " \
3771  "only harvesting partial statistics: " \
3772  "%d out of %d events (5.1%f%%) " \
3773  "at site `%s'" % \
3774  (max_events,
3775  nevents_old,
3776  100. * max_events / nevents_old,
3777  selected_site))
3778  self.logger.warning("!!! Please note that the number of " \
3779  "events in the output path name will " \
3780  "NOT reflect the actual statistics in " \
3781  "the harvested results !!!")
3782 
3783  # We found the site with the highest statistics and
3784  # the corresponding number of events. (CRAB gets upset
3785  # if we ask for more events than there are at a given
3786  # site.) Now update this information in our main
3787  # datasets_information variable.
3788  self.datasets_information[dataset_name]["sites"][run_number] = {selected_site: max_events}
3789  self.datasets_information[dataset_name]["num_events"][run_number] = max_events
3790  #self.datasets_information[dataset_name]["sites"][run_number] = [selected_site]
3791 
3792  # End of singlify_datasets.
3793 
3794  ##########
3795 
3797  """Check list of dataset names for impossible ones.
3798 
3799  Two kinds of checks are done:
3800  - Checks for things that do not make sense. These lead to
3801  errors and skipped datasets.
3802  - Sanity checks. For these warnings are issued but the user is
3803  considered to be the authoritative expert.
3804 
3805  Checks performed:
3806  - The CMSSW version encoded in the dataset name should match
3807  self.cmssw_version. This is critical.
3808  - There should be some events in the dataset/run. This is
3809  critical in the sense that CRAB refuses to create jobs for
3810  zero events. And yes, this does happen in practice. E.g. the
3811  reprocessed CRAFT08 datasets contain runs with zero events.
3812  - A cursory check is performed to see if the harvesting type
3813  makes sense for the data type. This should prevent the user
3814  from inadvertently running RelVal for data.
3815  - It is not possible to run single-step harvesting jobs on
3816  samples that are not fully contained at a single site.
3817  - Each dataset/run has to be available at at least one site.
3818 
3819  """
3820 
3821  self.logger.info("Performing sanity checks on dataset list...")
3822 
3823  dataset_names_after_checks = copy.deepcopy(self.datasets_to_use)
3824 
3825  for dataset_name in self.datasets_to_use.keys():
3826 
3827  # Check CMSSW version.
3828  version_from_dataset = self.datasets_information[dataset_name] \
3829  ["cmssw_version"]
3830  if version_from_dataset != self.cmssw_version:
3831  msg = " CMSSW version mismatch for dataset `%s' " \
3832  "(%s vs. %s)" % \
3833  (dataset_name,
3834  self.cmssw_version, version_from_dataset)
3835  if self.force_running:
3836  # Expert mode: just warn, then continue.
3837  self.logger.warning("%s " \
3838  "--> `force mode' active: " \
3839  "run anyway" % msg)
3840  else:
3841  del dataset_names_after_checks[dataset_name]
3842  self.logger.warning("%s " \
3843  "--> skipping" % msg)
3844  continue
3845 
3846  ###
3847 
3848  # Check that the harvesting type makes sense for the
3849  # sample. E.g. normally one would not run the DQMOffline
3850  # harvesting on Monte Carlo.
3851  # TODO TODO TODO
3852  # This should be further refined.
3853  suspicious = False
3854  datatype = self.datasets_information[dataset_name]["datatype"]
3855  if datatype == "data":
3856  # Normally only DQM harvesting is run on data.
3857  if self.harvesting_type != "DQMOffline":
3858  suspicious = True
3859  elif datatype == "mc":
3860  if self.harvesting_type == "DQMOffline":
3861  suspicious = True
3862  else:
3863  # Doh!
3864  assert False, "ERROR Impossible data type `%s' " \
3865  "for dataset `%s'" % \
3866  (datatype, dataset_name)
3867  if suspicious:
3868  msg = " Normally one does not run `%s' harvesting " \
3869  "on %s samples, are you sure?" % \
3870  (self.harvesting_type, datatype)
3871  if self.force_running:
3872  self.logger.warning("%s " \
3873  "--> `force mode' active: " \
3874  "run anyway" % msg)
3875  else:
3876  del dataset_names_after_checks[dataset_name]
3877  self.logger.warning("%s " \
3878  "--> skipping" % msg)
3879  continue
3880 
3881  # TODO TODO TODO end
3882 
3883  ###
3884 
3885  # BUG BUG BUG
3886  # For the moment, due to a problem with DBS, I cannot
3887  # figure out the GlobalTag for data by myself. (For MC
3888  # it's no problem.) This means that unless a GlobalTag was
3889  # specified from the command line, we will have to skip
3890  # any data datasets.
3891 
3892  if datatype == "data":
3893  if self.globaltag is None:
3894  msg = "For data datasets (like `%s') " \
3895  "we need a GlobalTag" % \
3896  dataset_name
3897  del dataset_names_after_checks[dataset_name]
3898  self.logger.warning("%s " \
3899  "--> skipping" % msg)
3900  continue
3901 
3902  # BUG BUG BUG end
3903 
3904  ###
3905 
3906  # Check if the GlobalTag exists and (if we're using
3907  # reference histograms) if it's ready to be used with
3908  # reference histograms.
3909  globaltag = self.datasets_information[dataset_name]["globaltag"]
3910  if not globaltag in self.globaltag_check_cache:
3911  if self.check_globaltag(globaltag):
3912  self.globaltag_check_cache.append(globaltag)
3913  else:
3914  msg = "Something is wrong with GlobalTag `%s' " \
3915  "used by dataset `%s'!" % \
3916  (globaltag, dataset_name)
3917  if self.use_ref_hists:
3918  msg += "\n(Either it does not exist or it " \
3919  "does not contain the required key to " \
3920  "be used with reference histograms.)"
3921  else:
3922  msg += "\n(It probably just does not exist.)"
3923  self.logger.fatal(msg)
3924  raise Usage(msg)
3925 
3926  ###
3927 
3928  # Require that each run is available at least somewhere.
3929  runs_without_sites = [i for (i, j) in \
3930  self.datasets_information[dataset_name] \
3931  ["sites"].items() \
3932  if len(j) < 1 and \
3933  i in self.datasets_to_use[dataset_name]]
3934  if len(runs_without_sites) > 0:
3935  for run_without_sites in runs_without_sites:
3936  try:
3937  dataset_names_after_checks[dataset_name].remove(run_without_sites)
3938  except KeyError:
3939  pass
3940  self.logger.warning(" removed %d unavailable run(s) " \
3941  "from dataset `%s'" % \
3942  (len(runs_without_sites), dataset_name))
3943  self.logger.debug(" (%s)" % \
3944  ", ".join([str(i) for i in \
3945  runs_without_sites]))
3946 
3947  ###
3948 
3949  # Unless we're running two-step harvesting: only allow
3950  # samples located on a single site.
3951  if not self.harvesting_mode == "two-step":
3952  for run_number in self.datasets_to_use[dataset_name]:
3953  # DEBUG DEBUG DEBUG
3954 ## if self.datasets_information[dataset_name]["num_events"][run_number] != 0:
3955 ## pdb.set_trace()
3956  # DEBUG DEBUG DEBUG end
3957  num_sites = len(self.datasets_information[dataset_name] \
3958  ["sites"][run_number])
3959  if num_sites > 1 and \
3960  not self.datasets_information[dataset_name] \
3961  ["mirrored"][run_number]:
3962  # Cannot do this with a single-step job, not
3963  # even in force mode. It just does not make
3964  # sense.
3965  msg = " Dataset `%s', run %d is spread across more " \
3966  "than one site.\n" \
3967  " Cannot run single-step harvesting on " \
3968  "samples spread across multiple sites" % \
3969  (dataset_name, run_number)
3970  try:
3971  dataset_names_after_checks[dataset_name].remove(run_number)
3972  except KeyError:
3973  pass
3974  self.logger.warning("%s " \
3975  "--> skipping" % msg)
3976 
3977  ###
3978 
3979  # Require that the dataset/run is non-empty.
3980  # NOTE: To avoid reconsidering empty runs/datasets next
3981  # time around, we do include them in the book keeping.
3982  # BUG BUG BUG
3983  # This should sum only over the runs that we use!
3984  tmp = [j for (i, j) in self.datasets_information \
3985  [dataset_name]["num_events"].items() \
3986  if i in self.datasets_to_use[dataset_name]]
3987  num_events_dataset = sum(tmp)
3988  # BUG BUG BUG end
3989  if num_events_dataset < 1:
3990  msg = " dataset `%s' is empty" % dataset_name
3991  del dataset_names_after_checks[dataset_name]
3992  self.logger.warning("%s " \
3993  "--> skipping" % msg)
3994  # Update the book keeping with all the runs in the dataset.
3995  # DEBUG DEBUG DEBUG
3996  #assert set([j for (i, j) in self.datasets_information \
3997  # [dataset_name]["num_events"].items() \
3998  # if i in self.datasets_to_use[dataset_name]]) == \
3999  # set([0])
4000  # DEBUG DEBUG DEBUG end
4001  #self.book_keeping_information[dataset_name] = self.datasets_information \
4002  # [dataset_name]["num_events"]
4003  continue
4004 
4005  tmp = [i for i in \
4006  self.datasets_information[dataset_name] \
4007  ["num_events"].items() if i[1] < 1]
4008  tmp = [i for i in tmp if i[0] in self.datasets_to_use[dataset_name]]
4009  empty_runs = dict(tmp)
4010  if len(empty_runs) > 0:
4011  for empty_run in empty_runs:
4012  try:
4013  dataset_names_after_checks[dataset_name].remove(empty_run)
4014  except KeyError:
4015  pass
4016  self.logger.info(" removed %d empty run(s) from dataset `%s'" % \
4017  (len(empty_runs), dataset_name))
4018  self.logger.debug(" (%s)" % \
4019  ", ".join([str(i) for i in empty_runs]))
4020 
4021  ###
4022 
4023  # If we emptied out a complete dataset, remove the whole
4024  # thing.
4025  dataset_names_after_checks_tmp = copy.deepcopy(dataset_names_after_checks)
4026  for (dataset_name, runs) in dataset_names_after_checks.iteritems():
4027  if len(runs) < 1:
4028  self.logger.warning(" Removing dataset without any runs " \
4029  "(left) `%s'" % \
4030  dataset_name)
4031  del dataset_names_after_checks_tmp[dataset_name]
4032  dataset_names_after_checks = dataset_names_after_checks_tmp
4033 
4034  ###
4035 
4036  self.logger.warning(" --> Removed %d dataset(s)" % \
4037  (len(self.datasets_to_use) -
4038  len(dataset_names_after_checks)))
4039 
4040  # Now store the modified version of the dataset list.
4041  self.datasets_to_use = dataset_names_after_checks
4042 
4043  # End of check_dataset_list.
4044 
4045  ##########
4046 
4047  def escape_dataset_name(self, dataset_name):
4048  """Escape a DBS dataset name.
4049 
4050  Escape a DBS dataset name such that it does not cause trouble
4051  with the file system. This means turning each `/' into `__',
4052  except for the first one which is just removed.
4053 
4054  """
4055 
4056  escaped_dataset_name = dataset_name
4057  escaped_dataset_name = escaped_dataset_name.strip("/")
4058  escaped_dataset_name = escaped_dataset_name.replace("/", "__")
4059 
4060  return escaped_dataset_name
4061 
4062  ##########
4063 
4064  # BUG BUG BUG
4065  # This is a bit of a redundant method, isn't it?
4066  def create_config_file_name(self, dataset_name, run_number):
4067  """Generate the name of the configuration file to be run by
4068  CRAB.
4069 
4070  Depending on the harvesting mode (single-step or two-step)
4071  this is the name of the real harvesting configuration or the
4072  name of the first-step ME summary extraction configuration.
4073 
4074  """
4075 
4076  if self.harvesting_mode == "single-step":
4077  config_file_name = self.create_harvesting_config_file_name(dataset_name)
4078  elif self.harvesting_mode == "single-step-allow-partial":
4079  config_file_name = self.create_harvesting_config_file_name(dataset_name)
4080 ## # Only add the alarming piece to the file name if this is
4081 ## # a spread-out dataset.
4082 ## pdb.set_trace()
4083 ## if self.datasets_information[dataset_name] \
4084 ## ["mirrored"][run_number] == False:
4085 ## config_file_name = config_file_name.replace(".py", "_partial.py")
4086  elif self.harvesting_mode == "two-step":
4087  config_file_name = self.create_me_summary_config_file_name(dataset_name)
4088  else:
4089  assert False, "ERROR Unknown harvesting mode `%s'" % \
4090  self.harvesting_mode
4091 
4092  # End of create_config_file_name.
4093  return config_file_name
4094  # BUG BUG BUG end
4095 
4096  ##########
4097 
4098  def create_harvesting_config_file_name(self, dataset_name):
4099  "Generate the name to be used for the harvesting config file."
4100 
4101  file_name_base = "harvesting.py"
4102  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4103  config_file_name = file_name_base.replace(".py",
4104  "_%s.py" % \
4105  dataset_name_escaped)
4106 
4107  # End of create_harvesting_config_file_name.
4108  return config_file_name
4109 
4110  ##########
4111 
4112  def create_me_summary_config_file_name(self, dataset_name):
4113  "Generate the name of the ME summary extraction config file."
4114 
4115  file_name_base = "me_extraction.py"
4116  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4117  config_file_name = file_name_base.replace(".py",
4118  "_%s.py" % \
4119  dataset_name_escaped)
4120 
4121  # End of create_me_summary_config_file_name.
4122  return config_file_name
4123 
4124  ##########
4125 
4126  def create_output_file_name(self, dataset_name, run_number=None):
4127  """Create the name of the output file name to be used.
4128 
4129  This is the name of the output file of the `first step'. In
4130  the case of single-step harvesting this is already the final
4131  harvesting output ROOT file. In the case of two-step
4132  harvesting it is the name of the intermediary ME summary
4133  file.
4134 
4135  """
4136 
4137  # BUG BUG BUG
4138  # This method has become a bit of a mess. Originally it was
4139  # nice to have one entry point for both single- and two-step
4140  # output file names. However, now the former needs the run
4141  # number, while the latter does not even know about run
4142  # numbers. This should be fixed up a bit.
4143  # BUG BUG BUG end
4144 
4145  if self.harvesting_mode == "single-step":
4146  # DEBUG DEBUG DEBUG
4147  assert not run_number is None
4148  # DEBUG DEBUG DEBUG end
4149  output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4150  elif self.harvesting_mode == "single-step-allow-partial":
4151  # DEBUG DEBUG DEBUG
4152  assert not run_number is None
4153  # DEBUG DEBUG DEBUG end
4154  output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4155  elif self.harvesting_mode == "two-step":
4156  # DEBUG DEBUG DEBUG
4157  assert run_number is None
4158  # DEBUG DEBUG DEBUG end
4159  output_file_name = self.create_me_summary_output_file_name(dataset_name)
4160  else:
4161  # This should not be possible, but hey...
4162  assert False, "ERROR Unknown harvesting mode `%s'" % \
4163  self.harvesting_mode
4164 
4165  # End of create_harvesting_output_file_name.
4166  return output_file_name
4167 
4168  ##########
4169 
4170  def create_harvesting_output_file_name(self, dataset_name, run_number):
4171  """Generate the name to be used for the harvesting output file.
4172 
4173  This harvesting output file is the _final_ ROOT output file
4174  containing the harvesting results. In case of two-step
4175  harvesting there is an intermediate ME output file as well.
4176 
4177  """
4178 
4179  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4180 
4181  # Hmmm, looking at the code for the DQMFileSaver this might
4182  # actually be the place where the first part of this file
4183  # naming scheme comes from.
4184  # NOTE: It looks like the `V0001' comes from the DQM
4185  # version. This is something that cannot be looked up from
4186  # here, so let's hope it does not change too often.
4187  output_file_name = "DQM_V0001_R%09d__%s.root" % \
4188  (run_number, dataset_name_escaped)
4189  if self.harvesting_mode.find("partial") > -1:
4190  # Only add the alarming piece to the file name if this is
4191  # a spread-out dataset.
4192  if self.datasets_information[dataset_name] \
4193  ["mirrored"][run_number] == False:
4194  output_file_name = output_file_name.replace(".root", \
4195  "_partial.root")
4196 
4197  # End of create_harvesting_output_file_name.
4198  return output_file_name
4199 
4200  ##########
4201 
4202  def create_me_summary_output_file_name(self, dataset_name):
4203  """Generate the name of the intermediate ME file name to be
4204  used in two-step harvesting.
4205 
4206  """
4207 
4208  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4209  output_file_name = "me_summary_%s.root" % \
4210  dataset_name_escaped
4211 
4212  # End of create_me_summary_output_file_name.
4213  return output_file_name
4214 
4215  ##########
4216 
4217  def create_multicrab_block_name(self, dataset_name, run_number, index):
4218  """Create the block name to use for this dataset/run number.
4219 
4220  This is what appears in the brackets `[]' in multicrab.cfg. It
4221  is used as the name of the job and to create output
4222  directories.
4223 
4224  """
4225 
4226  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4227  block_name = "%s_%09d_%s" % (dataset_name_escaped, run_number, index)
4228 
4229  # End of create_multicrab_block_name.
4230  return block_name
4231 
4232  ##########
4233 
4235  """Create a CRAB configuration for a given job.
4236 
4237  NOTE: This is _not_ a complete (as in: submittable) CRAB
4238  configuration. It is used to store the common settings for the
4239  multicrab configuration.
4240 
4241  NOTE: Only CERN CASTOR area (/castor/cern.ch/) is supported.
4242 
4243  NOTE: According to CRAB, you `Must define exactly two of
4244  total_number_of_events, events_per_job, or
4245  number_of_jobs.'. For single-step harvesting we force one job,
4246  for the rest we don't really care.
4247 
4248  # BUG BUG BUG
4249  # With the current version of CRAB (2.6.1), in which Daniele
4250  # fixed the behaviour of no_block_boundary for me, one _has to
4251  # specify_ the total_number_of_events and one single site in
4252  # the se_white_list.
4253  # BUG BUG BUG end
4254 
4255  """
4256 
4257  tmp = []
4258 
4259  # This is the stuff we will need to fill in.
4260  castor_prefix = self.castor_prefix
4261 
4262  tmp.append(self.config_file_header())
4263  tmp.append("")
4264 
4265  ## CRAB
4266  ##------
4267  tmp.append("[CRAB]")
4268  tmp.append("jobtype = cmssw")
4269  tmp.append("")
4270 
4271  ## GRID
4272  ##------
4273  tmp.append("[GRID]")
4274  tmp.append("virtual_organization=cms")
4275  tmp.append("")
4276 
4277  ## USER
4278  ##------
4279  tmp.append("[USER]")
4280  tmp.append("copy_data = 1")
4281  tmp.append("")
4282 
4283  ## CMSSW
4284  ##-------
4285  tmp.append("[CMSSW]")
4286  tmp.append("# This reveals data hosted on T1 sites,")
4287  tmp.append("# which is normally hidden by CRAB.")
4288  tmp.append("show_prod = 1")
4289  tmp.append("number_of_jobs = 1")
4290  if self.Jsonlumi == True:
4291  tmp.append("lumi_mask = %s" % self.Jsonfilename)
4292  tmp.append("total_number_of_lumis = -1")
4293  else:
4294  if self.harvesting_type == "DQMOffline":
4295  tmp.append("total_number_of_lumis = -1")
4296  else:
4297  tmp.append("total_number_of_events = -1")
4298  if self.harvesting_mode.find("single-step") > -1:
4299  tmp.append("# Force everything to run in one job.")
4300  tmp.append("no_block_boundary = 1")
4301  tmp.append("")
4302 
4303  ## CAF
4304  ##-----
4305  tmp.append("[CAF]")
4306 
4307  crab_config = "\n".join(tmp)
4308 
4309  # End of create_crab_config.
4310  return crab_config
4311 
4312  ##########
4313 
4315  """Create a multicrab.cfg file for all samples.
4316 
4317  This creates the contents for a multicrab.cfg file that uses
4318  the crab.cfg file (generated elsewhere) for the basic settings
4319  and contains blocks for each run of each dataset.
4320 
4321  # BUG BUG BUG
4322  # The fact that it's necessary to specify the se_white_list
4323  # and the total_number_of_events is due to our use of CRAB
4324  # version 2.6.1. This should no longer be necessary in the
4325  # future.
4326  # BUG BUG BUG end
4327 
4328  """
4329 
4330  cmd="who i am | cut -f1 -d' '"
4331  (status, output)=commands.getstatusoutput(cmd)
4332  UserName = output
4333 
4334  if self.caf_access == True:
4335  print "Extracting %s as user name" %UserName
4336 
4337  number_max_sites = self.nr_max_sites + 1
4338 
4339  multicrab_config_lines = []
4340  multicrab_config_lines.append(self.config_file_header())
4341  multicrab_config_lines.append("")
4342  multicrab_config_lines.append("[MULTICRAB]")
4343  multicrab_config_lines.append("cfg = crab.cfg")
4344  multicrab_config_lines.append("")
4345 
4346  dataset_names = self.datasets_to_use.keys()
4347  dataset_names.sort()
4348 
4349  for dataset_name in dataset_names:
4350  runs = self.datasets_to_use[dataset_name]
4351  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4352  castor_prefix = self.castor_prefix
4353 
4354  for run in runs:
4355 
4356  # CASTOR output dir.
4357  castor_dir = self.datasets_information[dataset_name] \
4358  ["castor_path"][run]
4359 
4360  cmd = "rfdir %s" % castor_dir
4361  (status, output) = commands.getstatusoutput(cmd)
4362 
4363  if len(output) <= 0:
4364 
4365  # DEBUG DEBUG DEBUG
4366  # We should only get here if we're treating a
4367  # dataset/run that is fully contained at a single
4368  # site.
4369  assert (len(self.datasets_information[dataset_name] \
4370  ["sites"][run]) == 1) or \
4371  self.datasets_information[dataset_name]["mirrored"]
4372  # DEBUG DEBUG DEBUG end
4373 
4374  site_names = self.datasets_information[dataset_name] \
4375  ["sites"][run].keys()
4376 
4377  for i in range(1, number_max_sites, 1):
4378  if len(site_names) > 0:
4379  index = "site_%02d" % (i)
4380 
4381  config_file_name = self. \
4382  create_config_file_name(dataset_name, run)
4383  output_file_name = self. \
4384  create_output_file_name(dataset_name, run)
4385 
4386 
4387  # If we're looking at a mirrored dataset we just pick
4388  # one of the sites. Otherwise there is nothing to
4389  # choose.
4390 
4391  # Loop variable
4392  loop = 0
4393 
4394  if len(site_names) > 1:
4395  cmssw_version = self.datasets_information[dataset_name] \
4396  ["cmssw_version"]
4397  self.logger.info("Picking site for mirrored dataset " \
4398  "`%s', run %d" % \
4399  (dataset_name, run))
4400  site_name = self.pick_a_site(site_names, cmssw_version)
4401  if site_name in site_names:
4402  site_names.remove(site_name)
4403 
4404  else:
4405  site_name = site_names[0]
4406  site_names.remove(site_name)
4407 
4408  if site_name is self.no_matching_site_found_str:
4409  if loop < 1:
4410  break
4411 
4412  nevents = self.datasets_information[dataset_name]["num_events"][run]
4413 
4414  # The block name.
4415  multicrab_block_name = self.create_multicrab_block_name( \
4416  dataset_name, run, index)
4417  multicrab_config_lines.append("[%s]" % \
4418  multicrab_block_name)
4419 
4420  ## CRAB
4421  ##------
4422  if site_name == "caf.cern.ch":
4423  multicrab_config_lines.append("CRAB.use_server=0")
4424  multicrab_config_lines.append("CRAB.scheduler=caf")
4425  else:
4426  multicrab_config_lines.append("scheduler = glite")
4427 
4428  ## GRID
4429  ##------
4430  if site_name == "caf.cern.ch":
4431  pass
4432  else:
4433  multicrab_config_lines.append("GRID.se_white_list = %s" % \
4434  site_name)
4435  multicrab_config_lines.append("# This removes the default blacklisting of T1 sites.")
4436  multicrab_config_lines.append("GRID.remove_default_blacklist = 1")
4437  multicrab_config_lines.append("GRID.rb = CERN")
4438  if not self.non_t1access:
4439  multicrab_config_lines.append("GRID.role = t1access")
4440 
4441  ## USER
4442  ##------
4443 
4444  castor_dir = castor_dir.replace(castor_prefix, "")
4445  multicrab_config_lines.append("USER.storage_element=srm-cms.cern.ch")
4446  multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4447  castor_dir)
4448  multicrab_config_lines.append("USER.check_user_remote_dir=0")
4449 
4450  if site_name == "caf.cern.ch":
4451  multicrab_config_lines.append("USER.storage_path=%s" % castor_prefix)
4452  #multicrab_config_lines.append("USER.storage_element=T2_CH_CAF")
4453  #castor_dir = castor_dir.replace("/cms/store/caf/user/%s" %UserName, "")
4454  #multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4455  # castor_dir)
4456  else:
4457  multicrab_config_lines.append("USER.storage_path=/srm/managerv2?SFN=%s" % castor_prefix)
4458  #multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4459  # castor_dir)
4460  #multicrab_config_lines.append("USER.storage_element=srm-cms.cern.ch")
4461 
4462  ## CMSSW
4463  ##-------
4464  multicrab_config_lines.append("CMSSW.pset = %s" % \
4465  config_file_name)
4466  multicrab_config_lines.append("CMSSW.datasetpath = %s" % \
4467  dataset_name)
4468  multicrab_config_lines.append("CMSSW.runselection = %d" % \
4469  run)
4470 
4471  if self.Jsonlumi == True:
4472  pass
4473  else:
4474  if self.harvesting_type == "DQMOffline":
4475  pass
4476  else:
4477  multicrab_config_lines.append("CMSSW.total_number_of_events = %d" % \
4478  nevents)
4479  # The output file name.
4480  multicrab_config_lines.append("CMSSW.output_file = %s" % \
4481  output_file_name)
4482 
4483  ## CAF
4484  ##-----
4485  if site_name == "caf.cern.ch":
4486  multicrab_config_lines.append("CAF.queue=cmscaf1nd")
4487 
4488 
4489  # End of block.
4490  multicrab_config_lines.append("")
4491 
4492  loop = loop + 1
4493 
4494  self.all_sites_found = True
4495 
4496  multicrab_config = "\n".join(multicrab_config_lines)
4497 
4498  # End of create_multicrab_config.
4499  return multicrab_config
4500 
4501  ##########
4502 
4503  def check_globaltag(self, globaltag=None):
4504  """Check if globaltag exists.
4505 
4506  Check if globaltag exists as GlobalTag in the database given
4507  by self.frontier_connection_name['globaltag']. If globaltag is
4508  None, self.globaltag is used instead.
4509 
4510  If we're going to use reference histograms this method also
4511  checks for the existence of the required key in the GlobalTag.
4512 
4513  """
4514 
4515  if globaltag is None:
4516  globaltag = self.globaltag
4517 
4518  # All GlobalTags should end in `::All', right?
4519  if globaltag.endswith("::All"):
4520  globaltag = globaltag[:-5]
4521 
4522  connect_name = self.frontier_connection_name["globaltag"]
4523  # BUG BUG BUG
4524  # There is a bug in cmscond_tagtree_list: some magic is
4525  # missing from the implementation requiring one to specify
4526  # explicitly the name of the squid to connect to. Since the
4527  # cmsHarvester can only be run from the CERN network anyway,
4528  # cmsfrontier:8000 is hard-coded in here. Not nice but it
4529  # works.
4530  connect_name = connect_name.replace("frontier://",
4531  "frontier://cmsfrontier:8000/")
4532  # BUG BUG BUG end
4533  connect_name += self.db_account_name_cms_cond_globaltag()
4534 
4535  tag_exists = self.check_globaltag_exists(globaltag, connect_name)
4536 
4537  #----------
4538 
4539  tag_contains_ref_hist_key = False
4540  if self.use_ref_hists and tag_exists:
4541  # Check for the key required to use reference histograms.
4542  tag_contains_ref_hist_key = self.check_globaltag_contains_ref_hist_key(globaltag, connect_name)
4543 
4544  #----------
4545 
4546  if self.use_ref_hists:
4547  ret_val = tag_exists and tag_contains_ref_hist_key
4548  else:
4549  ret_val = tag_exists
4550 
4551  #----------
4552 
4553  # End of check_globaltag.
4554  return ret_val
4555 
4556  ##########
4557 
4558  def check_globaltag_exists(self, globaltag, connect_name):
4559  """Check if globaltag exists.
4560 
4561  """
4562 
4563  self.logger.info("Checking existence of GlobalTag `%s'" % \
4564  globaltag)
4565  self.logger.debug(" (Using database connection `%s')" % \
4566  connect_name)
4567 
4568  cmd = "cmscond_tagtree_list -c %s -T %s" % \
4569  (connect_name, globaltag)
4570  (status, output) = commands.getstatusoutput(cmd)
4571  if status != 0 or \
4572  output.find("error") > -1:
4573  msg = "Could not check existence of GlobalTag `%s' in `%s'" % \
4574  (globaltag, connect_name)
4575  if output.find(".ALL_TABLES not found") > -1:
4576  msg = "%s\n" \
4577  "Missing database account `%s'" % \
4578  (msg, output.split(".ALL_TABLES")[0].split()[-1])
4579  self.logger.fatal(msg)
4580  self.logger.debug("Command used:")
4581  self.logger.debug(" %s" % cmd)
4582  self.logger.debug("Output received:")
4583  self.logger.debug(output)
4584  raise Error(msg)
4585  if output.find("does not exist") > -1:
4586  self.logger.debug("GlobalTag `%s' does not exist in `%s':" % \
4587  (globaltag, connect_name))
4588  self.logger.debug("Output received:")
4589  self.logger.debug(output)
4590  tag_exists = False
4591  else:
4592  tag_exists = True
4593  self.logger.info(" GlobalTag exists? -> %s" % tag_exists)
4594 
4595  # End of check_globaltag_exists.
4596  return tag_exists
4597 
4598  ##########
4599 
4600  def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name):
4601  """Check if globaltag contains the required RefHistos key.
4602 
4603  """
4604 
4605  # Check for the key required to use reference histograms.
4606  tag_contains_key = None
4607  ref_hist_key = "RefHistos"
4608  self.logger.info("Checking existence of reference " \
4609  "histogram key `%s' in GlobalTag `%s'" % \
4610  (ref_hist_key, globaltag))
4611  self.logger.debug(" (Using database connection `%s')" % \
4612  connect_name)
4613  cmd = "cmscond_tagtree_list -c %s -T %s -n %s" % \
4614  (connect_name, globaltag, ref_hist_key)
4615  (status, output) = commands.getstatusoutput(cmd)
4616  if status != 0 or \
4617  output.find("error") > -1:
4618  msg = "Could not check existence of key `%s'" % \
4619  (ref_hist_key, connect_name)
4620  self.logger.fatal(msg)
4621  self.logger.debug("Command used:")
4622  self.logger.debug(" %s" % cmd)
4623  self.logger.debug("Output received:")
4624  self.logger.debug(" %s" % output)
4625  raise Error(msg)
4626  if len(output) < 1:
4627  self.logger.debug("Required key for use of reference " \
4628  "histograms `%s' does not exist " \
4629  "in GlobalTag `%s':" % \
4630  (ref_hist_key, globaltag))
4631  self.logger.debug("Output received:")
4632  self.logger.debug(output)
4633  tag_contains_key = False
4634  else:
4635  tag_contains_key = True
4636 
4637  self.logger.info(" GlobalTag contains `%s' key? -> %s" % \
4638  (ref_hist_key, tag_contains_key))
4639 
4640  # End of check_globaltag_contains_ref_hist_key.
4641  return tag_contains_key
4642 
4643  ##########
4644 
4645  def check_ref_hist_tag(self, tag_name):
4646  """Check the existence of tag_name in database connect_name.
4647 
4648  Check if tag_name exists as a reference histogram tag in the
4649  database given by self.frontier_connection_name['refhists'].
4650 
4651  """
4652 
4653  connect_name = self.frontier_connection_name["refhists"]
4654  connect_name += self.db_account_name_cms_cond_dqm_summary()
4655 
4656  self.logger.debug("Checking existence of reference " \
4657  "histogram tag `%s'" % \
4658  tag_name)
4659  self.logger.debug(" (Using database connection `%s')" % \
4660  connect_name)
4661 
4662  cmd = "cmscond_list_iov -c %s" % \
4663  connect_name
4664  (status, output) = commands.getstatusoutput(cmd)
4665  if status != 0:
4666  msg = "Could not check existence of tag `%s' in `%s'" % \
4667  (tag_name, connect_name)
4668  self.logger.fatal(msg)
4669  self.logger.debug("Command used:")
4670  self.logger.debug(" %s" % cmd)
4671  self.logger.debug("Output received:")
4672  self.logger.debug(output)
4673  raise Error(msg)
4674  if not tag_name in output.split():
4675  self.logger.debug("Reference histogram tag `%s' " \
4676  "does not exist in `%s'" % \
4677  (tag_name, connect_name))
4678  self.logger.debug(" Existing tags: `%s'" % \
4679  "', `".join(output.split()))
4680  tag_exists = False
4681  else:
4682  tag_exists = True
4683  self.logger.debug(" Reference histogram tag exists? " \
4684  "-> %s" % tag_exists)
4685 
4686  # End of check_ref_hist_tag.
4687  return tag_exists
4688 
4689  ##########
4690 
4691  def create_es_prefer_snippet(self, dataset_name):
4692  """Build the es_prefer snippet for the reference histograms.
4693 
4694  The building of the snippet is wrapped in some care-taking
4695  code that figures out the name of the reference histogram set
4696  and makes sure the corresponding tag exists.
4697 
4698  """
4699 
4700  # Figure out the name of the reference histograms tag.
4701  # NOTE: The existence of these tags has already been checked.
4702  ref_hist_tag_name = self.ref_hist_mappings[dataset_name]
4703 
4704  connect_name = self.frontier_connection_name["refhists"]
4705  connect_name += self.db_account_name_cms_cond_dqm_summary()
4706  record_name = "DQMReferenceHistogramRootFileRcd"
4707 
4708  # Build up the code snippet.
4709  code_lines = []
4710  code_lines.append("from CondCore.DBCommon.CondDBSetup_cfi import *")
4711  code_lines.append("process.ref_hist_source = cms.ESSource(\"PoolDBESSource\", CondDBSetup,")
4712  code_lines.append(" connect = cms.string(\"%s\")," % connect_name)
4713  code_lines.append(" toGet = cms.VPSet(cms.PSet(record = cms.string(\"%s\")," % record_name)
4714  code_lines.append(" tag = cms.string(\"%s\"))," % ref_hist_tag_name)
4715  code_lines.append(" )")
4716  code_lines.append(" )")
4717  code_lines.append("process.es_prefer_ref_hist_source = cms.ESPrefer(\"PoolDBESSource\", \"ref_hist_source\")")
4718 
4719  snippet = "\n".join(code_lines)
4720 
4721  # End of create_es_prefer_snippet.
4722  return snippet
4723 
4724  ##########
4725 
4726  def create_harvesting_config(self, dataset_name):
4727  """Create the Python harvesting configuration for harvesting.
4728 
4729  The basic configuration is created by
4730  Configuration.PyReleaseValidation.ConfigBuilder. (This mimics
4731  what cmsDriver.py does.) After that we add some specials
4732  ourselves.
4733 
4734  NOTE: On one hand it may not be nice to circumvent
4735  cmsDriver.py, on the other hand cmsDriver.py does not really
4736  do anything itself. All the real work is done by the
4737  ConfigBuilder so there is not much risk that we miss out on
4738  essential developments of cmsDriver in the future.
4739 
4740  """
4741 
4742  # Setup some options needed by the ConfigBuilder.
4743  config_options = defaultOptions
4744 
4745  # These are fixed for all kinds of harvesting jobs. Some of
4746  # them are not needed for the harvesting config, but to keep
4747  # the ConfigBuilder happy.
4748  config_options.name = "harvesting"
4749  config_options.scenario = "pp"
4750  config_options.number = 1
4751  config_options.arguments = self.ident_string()
4752  config_options.evt_type = config_options.name
4753  config_options.customisation_file = None
4754  config_options.filein = "dummy_value"
4755  config_options.filetype = "EDM"
4756  # This seems to be new in CMSSW 3.3.X, no clue what it does.
4757  config_options.gflash = "dummy_value"
4758  # This seems to be new in CMSSW 3.3.0.pre6, no clue what it
4759  # does.
4760  #config_options.himix = "dummy_value"
4761  config_options.dbsquery = ""
4762 
4763  ###
4764 
4765  # These options depend on the type of harvesting we're doing
4766  # and are stored in self.harvesting_info.
4767 
4768  config_options.step = "HARVESTING:%s" % \
4769  self.harvesting_info[self.harvesting_type] \
4770  ["step_string"]
4771  config_options.beamspot = self.harvesting_info[self.harvesting_type] \
4772  ["beamspot"]
4773  config_options.eventcontent = self.harvesting_info \
4774  [self.harvesting_type] \
4775  ["eventcontent"]
4776  config_options.harvesting = self.harvesting_info \
4777  [self.harvesting_type] \
4778  ["harvesting"]
4779 
4780  ###
4781 
4782  # This one is required (see also above) for each dataset.
4783 
4784  datatype = self.datasets_information[dataset_name]["datatype"]
4785  config_options.isMC = (datatype.lower() == "mc")
4786  config_options.isData = (datatype.lower() == "data")
4787  globaltag = self.datasets_information[dataset_name]["globaltag"]
4788 
4789  config_options.conditions = self.format_conditions_string(globaltag)
4790 
4791  ###
4792 
4793  if "with_input" in getargspec(ConfigBuilder.__init__)[0]:
4794  # This is the case for 3.3.X.
4795  config_builder = ConfigBuilder(config_options, with_input=True)
4796  else:
4797  # This is the case in older CMSSW versions.
4798  config_builder = ConfigBuilder(config_options)
4799  config_builder.prepare(True)
4800  config_contents = config_builder.pythonCfgCode
4801 
4802  ###
4803 
4804  # Add our signature to the top of the configuration. and add
4805  # some markers to the head and the tail of the Python code
4806  # generated by the ConfigBuilder.
4807  marker_lines = []
4808  sep = "#" * 30
4809  marker_lines.append(sep)
4810  marker_lines.append("# Code between these markers was generated by")
4811  marker_lines.append("# Configuration.PyReleaseValidation." \
4812  "ConfigBuilder")
4813 
4814  marker_lines.append(sep)
4815  marker = "\n".join(marker_lines)
4816 
4817  tmp = [self.config_file_header()]
4818  tmp.append("")
4819  tmp.append(marker)
4820  tmp.append("")
4821  tmp.append(config_contents)
4822  tmp.append("")
4823  tmp.append(marker)
4824  tmp.append("")
4825  config_contents = "\n".join(tmp)
4826 
4827  ###
4828 
4829  # Now we add some stuff of our own.
4830  customisations = [""]
4831 
4832  customisations.append("# Now follow some customisations")
4833  customisations.append("")
4834  connect_name = self.frontier_connection_name["globaltag"]
4835  connect_name += self.db_account_name_cms_cond_globaltag()
4836  customisations.append("process.GlobalTag.connect = \"%s\"" % \
4837  connect_name)
4838 
4839 
4840  if self.saveByLumiSection == True:
4841  customisations.append("process.dqmSaver.saveByLumiSection = 1")
4842 
4844 
4845  customisations.append("")
4846 
4847  # About the reference histograms... For data there is only one
4848  # set of references and those are picked up automatically
4849  # based on the GlobalTag. For MC we have to do some more work
4850  # since the reference histograms to be used depend on the MC
4851  # sample at hand. In this case we glue in an es_prefer snippet
4852  # to pick up the references. We do this only for RelVals since
4853  # for MC there are no meaningful references so far.
4854 
4855  # NOTE: Due to the lack of meaningful references for
4856  # MC samples reference histograms are explicitly
4857  # switched off in this case.
4858 
4859  use_es_prefer = (self.harvesting_type == "RelVal")
4860  use_refs = use_es_prefer or \
4861  (not self.harvesting_type == "MC")
4862  # Allow global override.
4863  use_refs = use_refs and self.use_ref_hists
4864 
4865  if not use_refs:
4866  # Disable reference histograms explicitly. The histograms
4867  # are loaded by the dqmRefHistoRootFileGetter
4868  # EDAnalyzer. This analyzer can be run from several
4869  # sequences. Here we remove it from each sequence that
4870  # exists.
4871  customisations.append("print \"Not using reference histograms\"")
4872  customisations.append("if hasattr(process, \"dqmRefHistoRootFileGetter\"):")
4873  customisations.append(" for (sequence_name, sequence) in process.sequences.iteritems():")
4874  customisations.append(" if sequence.remove(process.dqmRefHistoRootFileGetter):")
4875  customisations.append(" print \"Removed process.dqmRefHistoRootFileGetter from sequence `%s'\" % \\")
4876  customisations.append(" sequence_name")
4877  customisations.append("process.dqmSaver.referenceHandling = \"skip\"")
4878  else:
4879  # This makes sure all reference histograms are saved to
4880  # the output ROOT file.
4881  customisations.append("process.dqmSaver.referenceHandling = \"all\"")
4882  if use_es_prefer:
4883  es_prefer_snippet = self.create_es_prefer_snippet(dataset_name)
4884  customisations.append(es_prefer_snippet)
4885 
4886  # Make sure we get the `workflow' correct. As far as I can see
4887  # this is only important for the output file name.
4888  workflow_name = dataset_name
4889  if self.harvesting_mode == "single-step-allow-partial":
4890  workflow_name += "_partial"
4891  customisations.append("process.dqmSaver.workflow = \"%s\"" % \
4892  workflow_name)
4893 
4894  # BUG BUG BUG
4895  # This still does not work. The current two-step harvesting
4896  # efforts are on hold waiting for the solution to come from
4897  # elsewhere. (In this case the elsewhere is Daniele Spiga.)
4898 
4899 ## # In case this file is the second step (the real harvesting
4900 ## # step) of the two-step harvesting we have to tell it to use
4901 ## # our local files.
4902 ## if self.harvesting_mode == "two-step":
4903 ## castor_dir = self.datasets_information[dataset_name] \
4904 ## ["castor_path"][run]
4905 ## customisations.append("")
4906 ## customisations.append("# This is the second step (the real")
4907 ## customisations.append("# harvesting step) of a two-step")
4908 ## customisations.append("# harvesting procedure.")
4909 ## # BUG BUG BUG
4910 ## # To be removed in production version.
4911 ## customisations.append("import pdb")
4912 ## # BUG BUG BUG end
4913 ## customisations.append("import commands")
4914 ## customisations.append("import os")
4915 ## customisations.append("castor_dir = \"%s\"" % castor_dir)
4916 ## customisations.append("cmd = \"rfdir %s\" % castor_dir")
4917 ## customisations.append("(status, output) = commands.getstatusoutput(cmd)")
4918 ## customisations.append("if status != 0:")
4919 ## customisations.append(" print \"ERROR\"")
4920 ## customisations.append(" raise Exception, \"ERROR\"")
4921 ## customisations.append("file_names = [os.path.join(\"rfio:%s\" % path, i) for i in output.split() if i.startswith(\"EDM_summary\") and i.endswith(\".root\")]")
4922 ## #customisations.append("pdb.set_trace()")
4923 ## customisations.append("process.source.fileNames = cms.untracked.vstring(*file_names)")
4924 ## customisations.append("")
4925 
4926  # BUG BUG BUG end
4927 
4928  config_contents = config_contents + "\n".join(customisations)
4929 
4930  ###
4931 
4932  # End of create_harvesting_config.
4933  return config_contents
4934 
4935 ## ##########
4936 
4937 ## def create_harvesting_config_two_step(self, dataset_name):
4938 ## """Create the Python harvesting configuration for two-step
4939 ## harvesting.
4940 
4941 ## """
4942 
4943 ## # BUG BUG BUG
4944 ## config_contents = self.create_harvesting_config_single_step(dataset_name)
4945 ## # BUG BUG BUG end
4946 
4947 ## # End of create_harvesting_config_two_step.
4948 ## return config_contents
4949 
4950  ##########
4951 
4952  def create_me_extraction_config(self, dataset_name):
4953  """
4954 
4955  """
4956 
4957  # Big chunk of hard-coded Python. Not such a big deal since
4958  # this does not do much and is not likely to break.
4959  tmp = []
4960  tmp.append(self.config_file_header())
4961  tmp.append("")
4962  tmp.append("import FWCore.ParameterSet.Config as cms")
4963  tmp.append("")
4964  tmp.append("process = cms.Process(\"ME2EDM\")")
4965  tmp.append("")
4966  tmp.append("# Import of standard configurations")
4967  tmp.append("process.load(\"Configuration/EventContent/EventContent_cff\")")
4968  tmp.append("")
4969  tmp.append("# We don't really process any events, just keep this set to one to")
4970  tmp.append("# make sure things work.")
4971  tmp.append("process.maxEvents = cms.untracked.PSet(")
4972  tmp.append(" input = cms.untracked.int32(1)")
4973  tmp.append(" )")
4974  tmp.append("")
4975  tmp.append("process.options = cms.untracked.PSet(")
4976  tmp.append(" Rethrow = cms.untracked.vstring(\"ProductNotFound\")")
4977  tmp.append(" )")
4978  tmp.append("")
4979  tmp.append("process.source = cms.Source(\"PoolSource\",")
4980  tmp.append(" processingMode = \\")
4981  tmp.append(" cms.untracked.string(\"RunsAndLumis\"),")
4982  tmp.append(" fileNames = \\")
4983  tmp.append(" cms.untracked.vstring(\"no_file_specified\")")
4984  tmp.append(" )")
4985  tmp.append("")
4986  tmp.append("# Output definition: drop everything except for the monitoring.")
4987  tmp.append("process.output = cms.OutputModule(")
4988  tmp.append(" \"PoolOutputModule\",")
4989  tmp.append(" outputCommands = \\")
4990  tmp.append(" cms.untracked.vstring(\"drop *\", \\")
4991  tmp.append(" \"keep *_MEtoEDMConverter_*_*\"),")
4992  output_file_name = self. \
4993  create_output_file_name(dataset_name)
4994  tmp.append(" fileName = \\")
4995  tmp.append(" cms.untracked.string(\"%s\")," % output_file_name)
4996  tmp.append(" dataset = cms.untracked.PSet(")
4997  tmp.append(" dataTier = cms.untracked.string(\"RECO\"),")
4998  tmp.append(" filterName = cms.untracked.string(\"\")")
4999  tmp.append(" )")
5000  tmp.append(" )")
5001  tmp.append("")
5002  tmp.append("# Additional output definition")
5003  tmp.append("process.out_step = cms.EndPath(process.output)")
5004  tmp.append("")
5005  tmp.append("# Schedule definition")
5006  tmp.append("process.schedule = cms.Schedule(process.out_step)")
5007  tmp.append("")
5008 
5009  config_contents = "\n".join(tmp)
5010 
5011  # End of create_me_extraction_config.
5012  return config_contents
5013 
5014  ##########
5015 
5016 ## def create_harvesting_config(self, dataset_name):
5017 ## """Create the Python harvesting configuration for a given job.
5018 
5019 ## NOTE: The reason to have a single harvesting configuration per
5020 ## sample is to be able to specify the GlobalTag corresponding to
5021 ## each sample. Since it has been decided that (apart from the
5022 ## prompt reco) datasets cannot contain runs with different
5023 ## GlobalTags, we don't need a harvesting config per run.
5024 
5025 ## NOTE: This is the place where we distinguish between
5026 ## single-step and two-step harvesting modes (at least for the
5027 ## Python job configuration).
5028 
5029 ## """
5030 
5031 ## ###
5032 
5033 ## if self.harvesting_mode == "single-step":
5034 ## config_contents = self.create_harvesting_config_single_step(dataset_name)
5035 ## elif self.harvesting_mode == "two-step":
5036 ## config_contents = self.create_harvesting_config_two_step(dataset_name)
5037 ## else:
5038 ## # Impossible harvesting mode, we should never get here.
5039 ## assert False, "ERROR: unknown harvesting mode `%s'" % \
5040 ## self.harvesting_mode
5041 
5042 ## ###
5043 
5044 ## # End of create_harvesting_config.
5045 ## return config_contents
5046 
5047  ##########
5048 
5050  """Write a CRAB job configuration Python file.
5051 
5052  """
5053 
5054  self.logger.info("Writing CRAB configuration...")
5055 
5056  file_name_base = "crab.cfg"
5057 
5058  # Create CRAB configuration.
5059  crab_contents = self.create_crab_config()
5060 
5061  # Write configuration to file.
5062  crab_file_name = file_name_base
5063  try:
5064  crab_file = file(crab_file_name, "w")
5065  crab_file.write(crab_contents)
5066  crab_file.close()
5067  except IOError:
5068  self.logger.fatal("Could not write " \
5069  "CRAB configuration to file `%s'" % \
5070  crab_file_name)
5071  raise Error("ERROR: Could not write to file `%s'!" % \
5072  crab_file_name)
5073 
5074  # End of write_crab_config.
5075 
5076  ##########
5077 
5079  """Write a multi-CRAB job configuration Python file.
5080 
5081  """
5082 
5083  self.logger.info("Writing multi-CRAB configuration...")
5084 
5085  file_name_base = "multicrab.cfg"
5086 
5087  # Create multi-CRAB configuration.
5088  multicrab_contents = self.create_multicrab_config()
5089 
5090  # Write configuration to file.
5091  multicrab_file_name = file_name_base
5092  try:
5093  multicrab_file = file(multicrab_file_name, "w")
5094  multicrab_file.write(multicrab_contents)
5095  multicrab_file.close()
5096  except IOError:
5097  self.logger.fatal("Could not write " \
5098  "multi-CRAB configuration to file `%s'" % \
5099  multicrab_file_name)
5100  raise Error("ERROR: Could not write to file `%s'!" % \
5101  multicrab_file_name)
5102 
5103  # End of write_multicrab_config.
5104 
5105  ##########
5106 
5107  def write_harvesting_config(self, dataset_name):
5108  """Write a harvesting job configuration Python file.
5109 
5110  NOTE: This knows nothing about single-step or two-step
5111  harvesting. That's all taken care of by
5112  create_harvesting_config.
5113 
5114  """
5115 
5116  self.logger.debug("Writing harvesting configuration for `%s'..." % \
5117  dataset_name)
5118 
5119  # Create Python configuration.
5120  config_contents = self.create_harvesting_config(dataset_name)
5121 
5122  # Write configuration to file.
5123  config_file_name = self. \
5125  try:
5126  config_file = file(config_file_name, "w")
5127  config_file.write(config_contents)
5128  config_file.close()
5129  except IOError:
5130  self.logger.fatal("Could not write " \
5131  "harvesting configuration to file `%s'" % \
5132  config_file_name)
5133  raise Error("ERROR: Could not write to file `%s'!" % \
5134  config_file_name)
5135 
5136  # End of write_harvesting_config.
5137 
5138  ##########
5139 
5140  def write_me_extraction_config(self, dataset_name):
5141  """Write an ME-extraction configuration Python file.
5142 
5143  This `ME-extraction' (ME = Monitoring Element) is the first
5144  step of the two-step harvesting.
5145 
5146  """
5147 
5148  self.logger.debug("Writing ME-extraction configuration for `%s'..." % \
5149  dataset_name)
5150 
5151  # Create Python configuration.
5152  config_contents = self.create_me_extraction_config(dataset_name)
5153 
5154  # Write configuration to file.
5155  config_file_name = self. \
5157  try:
5158  config_file = file(config_file_name, "w")
5159  config_file.write(config_contents)
5160  config_file.close()
5161  except IOError:
5162  self.logger.fatal("Could not write " \
5163  "ME-extraction configuration to file `%s'" % \
5164  config_file_name)
5165  raise Error("ERROR: Could not write to file `%s'!" % \
5166  config_file_name)
5167 
5168  # End of write_me_extraction_config.
5169 
5170  ##########
5171 
5172 
5173  def ref_hist_mappings_needed(self, dataset_name=None):
5174  """Check if we need to load and check the reference mappings.
5175 
5176  For data the reference histograms should be taken
5177  automatically from the GlobalTag, so we don't need any
5178  mappings. For RelVals we need to know a mapping to be used in
5179  the es_prefer code snippet (different references for each of
5180  the datasets.)
5181 
5182  WARNING: This implementation is a bit convoluted.
5183 
5184  """
5185 
5186  # If no dataset name given, do everything, otherwise check
5187  # only this one dataset.
5188  if not dataset_name is None:
5189  data_type = self.datasets_information[dataset_name] \
5190  ["datatype"]
5191  mappings_needed = (data_type == "mc")
5192  # DEBUG DEBUG DEBUG
5193  if not mappings_needed:
5194  assert data_type == "data"
5195  # DEBUG DEBUG DEBUG end
5196  else:
5197  tmp = [self.ref_hist_mappings_needed(dataset_name) \
5198  for dataset_name in \
5199  self.datasets_information.keys()]
5200  mappings_needed = (True in tmp)
5201 
5202  # End of ref_hist_mappings_needed.
5203  return mappings_needed
5204 
5205  ##########
5206 
5208  """Load the reference histogram mappings from file.
5209 
5210  The dataset name to reference histogram name mappings are read
5211  from a text file specified in self.ref_hist_mappings_file_name.
5212 
5213  """
5214 
5215  # DEBUG DEBUG DEBUG
5216  assert len(self.ref_hist_mappings) < 1, \
5217  "ERROR Should not be RE-loading " \
5218  "reference histogram mappings!"
5219  # DEBUG DEBUG DEBUG end
5220 
5221  self.logger.info("Loading reference histogram mappings " \
5222  "from file `%s'" % \
5223  self.ref_hist_mappings_file_name)
5224 
5225  mappings_lines = None
5226  try:
5227  mappings_file = file(self.ref_hist_mappings_file_name, "r")
5228  mappings_lines = mappings_file.readlines()
5229  mappings_file.close()
5230  except IOError:
5231  msg = "ERROR: Could not open reference histogram mapping "\
5232  "file `%s'" % self.ref_hist_mappings_file_name
5233  self.logger.fatal(msg)
5234  raise Error(msg)
5235 
5236  ##########
5237 
5238  # The format we expect is: two white-space separated pieces
5239  # per line. The first the dataset name for which the reference
5240  # should be used, the second one the name of the reference
5241  # histogram in the database.
5242 
5243  for mapping in mappings_lines:
5244  # Skip comment lines.
5245  if not mapping.startswith("#"):
5246  mapping = mapping.strip()
5247  if len(mapping) > 0:
5248  mapping_pieces = mapping.split()
5249  if len(mapping_pieces) != 2:
5250  msg = "ERROR: The reference histogram mapping " \
5251  "file contains a line I don't " \
5252  "understand:\n %s" % mapping
5253  self.logger.fatal(msg)
5254  raise Error(msg)
5255  dataset_name = mapping_pieces[0].strip()
5256  ref_hist_name = mapping_pieces[1].strip()
5257  # We don't want people to accidentally specify
5258  # multiple mappings for the same dataset. Just
5259  # don't accept those cases.
5260  if dataset_name in self.ref_hist_mappings:
5261  msg = "ERROR: The reference histogram mapping " \
5262  "file contains multiple mappings for " \
5263  "dataset `%s'."
5264  self.logger.fatal(msg)
5265  raise Error(msg)
5266 
5267  # All is well that ends well.
5268  self.ref_hist_mappings[dataset_name] = ref_hist_name
5269 
5270  ##########
5271 
5272  self.logger.info(" Successfully loaded %d mapping(s)" % \
5273  len(self.ref_hist_mappings))
5274  max_len = max([len(i) for i in self.ref_hist_mappings.keys()])
5275  for (map_from, map_to) in self.ref_hist_mappings.iteritems():
5276  self.logger.info(" %-*s -> %s" % \
5277  (max_len, map_from, map_to))
5278 
5279  # End of load_ref_hist_mappings.
5280 
5281  ##########
5282 
5284  """Make sure all necessary reference histograms exist.
5285 
5286  Check that for each of the datasets to be processed a
5287  reference histogram is specified and that that histogram
5288  exists in the database.
5289 
5290  NOTE: There's a little complication here. Since this whole
5291  thing was designed to allow (in principle) harvesting of both
5292  data and MC datasets in one go, we need to be careful to check
5293  the availability fof reference mappings only for those
5294  datasets that need it.
5295 
5296  """
5297 
5298  self.logger.info("Checking reference histogram mappings")
5299 
5300  for dataset_name in self.datasets_to_use:
5301  try:
5302  ref_hist_name = self.ref_hist_mappings[dataset_name]
5303  except KeyError:
5304  msg = "ERROR: No reference histogram mapping found " \
5305  "for dataset `%s'" % \
5306  dataset_name
5307  self.logger.fatal(msg)
5308  raise Error(msg)
5309 
5310  if not self.check_ref_hist_tag(ref_hist_name):
5311  msg = "Reference histogram tag `%s' " \
5312  "(used for dataset `%s') does not exist!" % \
5313  (ref_hist_name, dataset_name)
5314  self.logger.fatal(msg)
5315  raise Usage(msg)
5316 
5317  self.logger.info(" Done checking reference histogram mappings.")
5318 
5319  # End of check_ref_hist_mappings.
5320 
5321  ##########
5322 
5324  """Obtain all information on the datasets that we need to run.
5325 
5326  Use DBS to figure out all required information on our
5327  datasets, like the run numbers and the GlobalTag. All
5328  information is stored in the datasets_information member
5329  variable.
5330 
5331  """
5332 
5333  # Get a list of runs in the dataset.
5334  # NOTE: The harvesting has to be done run-by-run, so we
5335  # split up datasets based on the run numbers. Strictly
5336  # speaking this is not (yet?) necessary for Monte Carlo
5337  # since all those samples use run number 1. Still, this
5338  # general approach should work for all samples.
5339 
5340  # Now loop over all datasets in the list and process them.
5341  # NOTE: This processing has been split into several loops
5342  # to be easier to follow, sacrificing a bit of efficiency.
5343  self.datasets_information = {}
5344  self.logger.info("Collecting information for all datasets to process")
5345  dataset_names = self.datasets_to_use.keys()
5346  dataset_names.sort()
5347  for dataset_name in dataset_names:
5348 
5349  # Tell the user which dataset: nice with many datasets.
5350  sep_line = "-" * 30
5351  self.logger.info(sep_line)
5352  self.logger.info(" `%s'" % dataset_name)
5353  self.logger.info(sep_line)
5354 
5355  runs = self.dbs_resolve_runs(dataset_name)
5356  self.logger.info(" found %d run(s)" % len(runs))
5357  if len(runs) > 0:
5358  self.logger.debug(" run number(s): %s" % \
5359  ", ".join([str(i) for i in runs]))
5360  else:
5361  # DEBUG DEBUG DEBUG
5362  # This should never happen after the DBS checks.
5363  self.logger.warning(" --> skipping dataset "
5364  "without any runs")
5365  assert False, "Panic: found a dataset without runs " \
5366  "after DBS checks!"
5367  # DEBUG DEBUG DEBUG end
5368 
5369  cmssw_version = self.dbs_resolve_cmssw_version(dataset_name)
5370  self.logger.info(" found CMSSW version `%s'" % cmssw_version)
5371 
5372  # Figure out if this is data or MC.
5373  datatype = self.dbs_resolve_datatype(dataset_name)
5374  self.logger.info(" sample is data or MC? --> %s" % \
5375  datatype)
5376 
5377  ###
5378 
5379  # Try and figure out the GlobalTag to be used.
5380  if self.globaltag is None:
5381  globaltag = self.dbs_resolve_globaltag(dataset_name)
5382  else:
5383  globaltag = self.globaltag
5384 
5385  self.logger.info(" found GlobalTag `%s'" % globaltag)
5386 
5387  # DEBUG DEBUG DEBUG
5388  if globaltag == "":
5389  # Actually we should not even reach this point, after
5390  # our dataset sanity checks.
5391  assert datatype == "data", \
5392  "ERROR Empty GlobalTag for MC dataset!!!"
5393  # DEBUG DEBUG DEBUG end
5394 
5395  ###
5396 
5397  # DEBUG DEBUG DEBUG
5398  #tmp = self.dbs_check_dataset_spread_old(dataset_name)
5399  # DEBUG DEBUG DEBUG end
5400  sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5401 
5402  # Extract the total event counts.
5403  num_events = {}
5404  for run_number in sites_catalog.keys():
5405  num_events[run_number] = sites_catalog \
5406  [run_number]["all_sites"]
5407  del sites_catalog[run_number]["all_sites"]
5408 
5409  # Extract the information about whether or not datasets
5410  # are mirrored.
5411  mirror_catalog = {}
5412  for run_number in sites_catalog.keys():
5413  mirror_catalog[run_number] = sites_catalog \
5414  [run_number]["mirrored"]
5415  del sites_catalog[run_number]["mirrored"]
5416 
5417  # BUG BUG BUG
5418  # I think I could now get rid of that and just fill the
5419  # "sites" entry with the `inverse' of this
5420  # num_events_catalog(?).
5421  #num_sites = self.dbs_resolve_dataset_number_of_sites(dataset_name)
5422  #sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5423  #sites_catalog = dict(zip(num_events_catalog.keys(),
5424  # [[j for i in num_events_catalog.values() for j in i.keys()]]))
5425  # BUG BUG BUG end
5426 
5427 ## # DEBUG DEBUG DEBUG
5428 ## # This is probably only useful to make sure we don't muck
5429 ## # things up, right?
5430 ## # Figure out across how many sites this sample has been spread.
5431 ## if num_sites == 1:
5432 ## self.logger.info(" sample is contained at a single site")
5433 ## else:
5434 ## self.logger.info(" sample is spread across %d sites" % \
5435 ## num_sites)
5436 ## if num_sites < 1:
5437 ## # NOTE: This _should not_ happen with any valid dataset.
5438 ## self.logger.warning(" --> skipping dataset which is not " \
5439 ## "hosted anywhere")
5440 ## # DEBUG DEBUG DEBUG end
5441 
5442  # Now put everything in a place where we can find it again
5443  # if we need it.
5444  self.datasets_information[dataset_name] = {}
5445  self.datasets_information[dataset_name]["runs"] = runs
5446  self.datasets_information[dataset_name]["cmssw_version"] = \
5447  cmssw_version
5448  self.datasets_information[dataset_name]["globaltag"] = globaltag
5449  self.datasets_information[dataset_name]["datatype"] = datatype
5450  self.datasets_information[dataset_name]["num_events"] = num_events
5451  self.datasets_information[dataset_name]["mirrored"] = mirror_catalog
5452  self.datasets_information[dataset_name]["sites"] = sites_catalog
5453 
5454  # Each run of each dataset has a different CASTOR output
5455  # path.
5456  castor_path_common = self.create_castor_path_name_common(dataset_name)
5457  self.logger.info(" output will go into `%s'" % \
5458  castor_path_common)
5459 
5460  castor_paths = dict(list(zip(runs,
5461  [self.create_castor_path_name_special(dataset_name, i, castor_path_common) \
5462  for i in runs])))
5463  for path_name in castor_paths.values():
5464  self.logger.debug(" %s" % path_name)
5465  self.datasets_information[dataset_name]["castor_path"] = \
5466  castor_paths
5467 
5468  # End of build_datasets_information.
5469 
5470  ##########
5471 
5473  """Tell the user what to do now, after this part is done.
5474 
5475  This should provide the user with some (preferably
5476  copy-pasteable) instructions on what to do now with the setups
5477  and files that have been created.
5478 
5479  """
5480 
5481  # TODO TODO TODO
5482  # This could be improved a bit.
5483  # TODO TODO TODO end
5484 
5485  sep_line = "-" * 60
5486 
5487  self.logger.info("")
5488  self.logger.info(sep_line)
5489  self.logger.info(" Configuration files have been created.")
5490  self.logger.info(" From here on please follow the usual CRAB instructions.")
5491  self.logger.info(" Quick copy-paste instructions are shown below.")
5492  self.logger.info(sep_line)
5493 
5494  self.logger.info("")
5495  self.logger.info(" Create all CRAB jobs:")
5496  self.logger.info(" multicrab -create")
5497  self.logger.info("")
5498  self.logger.info(" Submit all CRAB jobs:")
5499  self.logger.info(" multicrab -submit")
5500  self.logger.info("")
5501  self.logger.info(" Check CRAB status:")
5502  self.logger.info(" multicrab -status")
5503  self.logger.info("")
5504 
5505  self.logger.info("")
5506  self.logger.info(" For more information please see the CMS Twiki:")
5507  self.logger.info(" %s" % twiki_url)
5508  self.logger.info(sep_line)
5509 
5510  # If there were any jobs for which we could not find a
5511  # matching site show a warning message about that.
5512  if not self.all_sites_found:
5513  self.logger.warning(" For some of the jobs no matching " \
5514  "site could be found")
5515  self.logger.warning(" --> please scan your multicrab.cfg" \
5516  "for occurrences of `%s'." % \
5517  self.no_matching_site_found_str)
5518  self.logger.warning(" You will have to fix those " \
5519  "by hand, sorry.")
5520 
5521  # End of show_exit_message.
5522 
5523  ##########
5524 
5525  def run(self):
5526  "Main entry point of the CMS harvester."
5527 
5528  # Start with a positive thought.
5529  exit_code = 0
5530 
5531  try:
5532 
5533  try:
5534 
5535  # Parse all command line options and arguments
5536  self.parse_cmd_line_options()
5537  # and check that they make sense.
5538  self.check_input_status()
5539 
5540  # Check if CMSSW is setup.
5541  self.check_cmssw()
5542 
5543  # Check if DBS is setup,
5544  self.check_dbs()
5545  # and if all is fine setup the Python side.
5546  self.setup_dbs()
5547 
5548  # Fill our dictionary with all the required info we
5549  # need to understand harvesting jobs. This needs to be
5550  # done after the CMSSW version is known.
5551  self.setup_harvesting_info()
5552 
5553  # Obtain list of dataset names to consider
5554  self.build_dataset_use_list()
5555  # and the list of dataset names to ignore.
5556  self.build_dataset_ignore_list()
5557 
5558  # The same for the runs lists (if specified).
5559  self.build_runs_use_list()
5560  self.build_runs_ignore_list()
5561 
5562  # Process the list of datasets to ignore and fold that
5563  # into the list of datasets to consider.
5564  # NOTE: The run-based selection is done later since
5565  # right now we don't know yet which runs a dataset
5566  # contains.
5567  self.process_dataset_ignore_list()
5568 
5569  # Obtain all required information on the datasets,
5570  # like run numbers and GlobalTags.
5571  self.build_datasets_information()
5572 
5573  if self.use_ref_hists and \
5574  self.ref_hist_mappings_needed():
5575  # Load the dataset name to reference histogram
5576  # name mappings from file.
5577  self.load_ref_hist_mappings()
5578  # Now make sure that for all datasets we want to
5579  # process there is a reference defined. Otherwise
5580  # just bomb out before wasting any more time.
5581  self.check_ref_hist_mappings()
5582  else:
5583  self.logger.info("No need to load reference " \
5584  "histogram mappings file")
5585 
5586  # OBSOLETE OBSOLETE OBSOLETE
5587 ## # TODO TODO TODO
5588 ## # Need to think about where this should go, but
5589 ## # somewhere we have to move over the fact that we want
5590 ## # to process all runs for each dataset that we're
5591 ## # considering. This basically means copying over the
5592 ## # information from self.datasets_information[]["runs"]
5593 ## # to self.datasets_to_use[].
5594 ## for dataset_name in self.datasets_to_use.keys():
5595 ## self.datasets_to_use[dataset_name] = self.datasets_information[dataset_name]["runs"]
5596 ## # TODO TODO TODO end
5597  # OBSOLETE OBSOLETE OBSOLETE end
5598 
5599  self.process_runs_use_and_ignore_lists()
5600 
5601  # If we've been asked to sacrifice some parts of
5602  # spread-out samples in order to be able to partially
5603  # harvest them, we'll do that here.
5604  if self.harvesting_mode == "single-step-allow-partial":
5605  self.singlify_datasets()
5606 
5607  # Check dataset name(s)
5608  self.check_dataset_list()
5609  # and see if there is anything left to do.
5610  if len(self.datasets_to_use) < 1:
5611  self.logger.info("After all checks etc. " \
5612  "there are no datasets (left?) " \
5613  "to process")
5614  else:
5615 
5616  self.logger.info("After all checks etc. we are left " \
5617  "with %d dataset(s) to process " \
5618  "for a total of %d runs" % \
5619  (len(self.datasets_to_use),
5620  sum([len(i) for i in \
5621  self.datasets_to_use.values()])))
5622 
5623  # NOTE: The order in which things are done here is
5624  # important. At the end of the job, independent on
5625  # how it ends (exception, CTRL-C, normal end) the
5626  # book keeping is written to file. At that time it
5627  # should be clear which jobs are done and can be
5628  # submitted. This means we first create the
5629  # general files, and then the per-job config
5630  # files.
5631 
5632  # TODO TODO TODO
5633  # It would be good to modify the book keeping a
5634  # bit. Now we write the crab.cfg (which is the
5635  # same for all samples and runs) and the
5636  # multicrab.cfg (which contains blocks for all
5637  # runs of all samples) without updating our book
5638  # keeping. The only place we update the book
5639  # keeping is after writing the harvesting config
5640  # file for a given dataset. Since there is only
5641  # one single harvesting configuration for each
5642  # dataset, we have no book keeping information on
5643  # a per-run basis.
5644  # TODO TODO TODO end
5645 
5646  # Check if the CASTOR output area exists. If
5647  # necessary create it.
5648  self.create_and_check_castor_dirs()
5649 
5650  # Create one crab and one multicrab configuration
5651  # for all jobs together.
5652  self.write_crab_config()
5653  self.write_multicrab_config()
5654 
5655  # Loop over all datasets and create harvesting
5656  # config files for all of them. One harvesting
5657  # config per dataset is enough. The same file will
5658  # be re-used by CRAB for each run.
5659  # NOTE: We always need a harvesting
5660  # configuration. For the two-step harvesting we
5661  # also need a configuration file for the first
5662  # step: the monitoring element extraction.
5663  for dataset_name in self.datasets_to_use.keys():
5664  try:
5665  self.write_harvesting_config(dataset_name)
5666  if self.harvesting_mode == "two-step":
5667  self.write_me_extraction_config(dataset_name)
5668  except:
5669  # Doh! Just re-raise the damn thing.
5670  raise
5671  else:
5672 ## tmp = self.datasets_information[dataset_name] \
5673 ## ["num_events"]
5674  tmp = {}
5675  for run_number in self.datasets_to_use[dataset_name]:
5676  tmp[run_number] = self.datasets_information \
5677  [dataset_name]["num_events"][run_number]
5678  if dataset_name in self.book_keeping_information:
5679  self.book_keeping_information[dataset_name].update(tmp)
5680  else:
5681  self.book_keeping_information[dataset_name] = tmp
5682 
5683  # Explain the user what to do now.
5684  self.show_exit_message()
5685 
5686  except Usage as err:
5687  # self.logger.fatal(err.msg)
5688  # self.option_parser.print_help()
5689  pass
5690 
5691  except Error as err:
5692  # self.logger.fatal(err.msg)
5693  exit_code = 1
5694 
5695  except Exception as err:
5696  # Hmmm, ignore keyboard interrupts from the
5697  # user. These are not a `serious problem'. We also
5698  # skip SystemExit, which is the exception thrown when
5699  # one calls sys.exit(). This, for example, is done by
5700  # the option parser after calling print_help(). We
5701  # also have to catch all `no such option'
5702  # complaints. Everything else we catch here is a
5703  # `serious problem'.
5704  if isinstance(err, SystemExit):
5705  self.logger.fatal(err.code)
5706  elif not isinstance(err, KeyboardInterrupt):
5707  self.logger.fatal("!" * 50)
5708  self.logger.fatal(" This looks like a serious problem.")
5709  self.logger.fatal(" If you are sure you followed all " \
5710  "instructions")
5711  self.logger.fatal(" please copy the below stack trace together")
5712  self.logger.fatal(" with a description of what you were doing to")
5713  self.logger.fatal(" jeroen.hegeman@cern.ch.")
5714  self.logger.fatal(" %s" % self.ident_string())
5715  self.logger.fatal("!" * 50)
5716  self.logger.fatal(str(err))
5717  import traceback
5718  traceback_string = traceback.format_exc()
5719  for line in traceback_string.split("\n"):
5720  self.logger.fatal(line)
5721  self.logger.fatal("!" * 50)
5722  exit_code = 2
5723 
5724  # This is the stuff that we should really do, no matter
5725  # what. Of course cleaning up after ourselves is also done
5726  # from this place. This alsokeeps track of the book keeping
5727  # so far. (This means that if half of the configuration files
5728  # were created before e.g. the disk was full, we should still
5729  # have a consistent book keeping file.
5730  finally:
5731 
5732  self.cleanup()
5733 
5734  ###
5735 
5736  if self.crab_submission == True:
5737  os.system("multicrab -create")
5738  os.system("multicrab -submit")
5739 
5740  # End of run.
5741  return exit_code
5742 
5743  # End of CMSHarvester.
5744 
5745 ###########################################################################
5746 ## Main entry point.
5747 ###########################################################################
5748 
5749 if __name__ == "__main__":
5750  "Main entry point for harvesting."
5751 
5752  CMSHarvester().run()
5753 
5754  # Done.
5755 
5756 ###########################################################################
def create_crab_config(self)
def option_handler_harvesting_mode(self, option, opt_str, value, parser)
def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser)
def check_cmssw(self)
def write_harvesting_config(self, dataset_name)
def option_handler_saveByLumiSection(self, option, opt_str, value, parser)
def option_handler_sites(self, option, opt_str, value, parser)
def write_me_extraction_config(self, dataset_name)
def __init__(self, msg)
def process_runs_use_and_ignore_lists(self)
def build_dataset_ignore_list(self)
def escape_dataset_name(self, dataset_name)
if self.datasets_information[dataset_name]["num_events"][run_number] != 0: pdb.set_trace() DEBUG DEBU...
def startElement(self, name, attrs)
def singlify_datasets(self)
def option_handler_castor_dir(self, option, opt_str, value, parser)
def option_handler_dataset_name(self, option, opt_str, value, parser): """Specify the name(s) of the ...
def option_handler_force(self, option, opt_str, value, parser)
def option_handler_caf_access(self, option, opt_str, value, parser)
def option_handler_frontier_connection(self, option, opt_str, value, parser)
def option_handler_crab_submission(self, option, opt_str, value, parser)
def create_harvesting_config(self, dataset_name)
def create_castor_path_name_special(self, dataset_name, run_number, castor_path_common)
def option_handler_harvesting_type(self, option, opt_str, value, parser)
def write_crab_config(self)
def create_harvesting_config(self, dataset_name): """Create the Python harvesting configuration for a...
def replace(string, replacements)
def write_multicrab_config(self)
def build_runs_ignore_list(self)
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
def option_handler_preferred_site(self, option, opt_str, value, parser)
def option_handler_quiet(self, option, opt_str, value, parser)
def run(self)
def create_castor_path_name_common(self, dataset_name)
def show_exit_message(self)
DEBUG DEBUG DEBUGThis is probably only useful to make sure we don&#39;t muckthings up, right?Figure out across how many sites this sample has been spread.
Helper class: Error exception.
def check_globaltag(self, globaltag=None)
CRAB
def ref_hist_mappings_needed(self, dataset_name=None)
def dbs_resolve_runs(self, dataset_name)
def dbs_resolve_dataset_number_of_events(self, dataset_name): """Ask DBS across how many events this ...
Helper class: Usage exception.
def option_handler_no_ref_hists(self, option, opt_str, value, parser)
def create_me_extraction_config(self, dataset_name)
In case this file is the second step (the real harvestingstep) of the two-step harvesting we have to ...
def create_me_summary_config_file_name(self, dataset_name)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def build_datasets_information(self)
def check_globaltag_exists(self, globaltag, connect_name)
def check_ref_hist_mappings(self)
def check_ref_hist_tag(self, tag_name)
def dbs_resolve_datatype(self, dataset_name)
def create_multicrab_block_name(self, dataset_name, run_number, index)
def __init__(self, cmd_line_opts=None)
def __init__(self, tag_names)
def setup_harvesting_info(self)
Helper class: DBSXMLHandler.
def option_handler_list_types(self, option, opt_str, value, parser)
def create_output_file_name(self, dataset_name, run_number=None)
def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser)
def option_handler_book_keeping_file(self, option, opt_str, value, parser)
def pick_a_site(self, sites, cmssw_version)
self.logger.debug("Checking CASTOR path piece `%s&#39;" % \ piece)
def create_and_check_castor_dir(self, castor_dir)
def dbs_resolve_cmssw_version(self, dataset_name)
def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name)
def dbs_resolve_number_of_events(self, dataset_name, run_number=None)
def set_output_level(self, output_level)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def load_ref_hist_mappings(self)
def parse_cmd_line_options(self)
def option_handler_input_Jsonfile(self, option, opt_str, value, parser)
def option_handler_input_todofile(self, option, opt_str, value, parser)
def option_handler_input_spec(self, option, opt_str, value, parser)
def create_es_prefer_snippet(self, dataset_name)
def option_handler_debug(self, option, opt_str, value, parser)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:211
def dbs_resolve_globaltag(self, dataset_name)
def check_dbs(self)
def format_conditions_string(self, globaltag)
CMSHarvester class.
def build_runs_list(self, input_method, input_name)
def check_dataset_list(self)
def create_harvesting_config_file_name(self, dataset_name)
Only add the alarming piece to the file name if this isa spread-out dataset.
def __init__(self, msg)
def dbs_check_dataset_spread(self, dataset_name)
def dbs_resolve_dataset_number_of_sites(self, dataset_name): """Ask DBS across how many sites this da...
#define update(a, b)
def build_dataset_use_list(self)
def create_me_summary_output_file_name(self, dataset_name)
def characters(self, content)
def db_account_name_cms_cond_globaltag(self)
def option_handler_no_t1access(self, option, opt_str, value, parser)
def option_handler_globaltag(self, option, opt_str, value, parser)
def db_account_name_cms_cond_dqm_summary(self)
def check_input_status(self)
def dbs_resolve_dataset_name(self, dataset_name)
def create_config_file_name(self, dataset_name, run_number)
def setup_dbs(self)
Now we try to do a very simple DBS search.
def build_runs_use_list(self)
def create_multicrab_config(self)
CRAB
def create_and_check_castor_dirs(self)
Helper class: CMSHarvesterHelpFormatter.
double split
Definition: MVATrainer.cc:139
def create_harvesting_output_file_name(self, dataset_name, run_number)
def endElement(self, name)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
def process_dataset_ignore_list(self)
def build_dataset_list(self, input_method, input_name)
class Handler(xml.sax.handler.ContentHandler): def startElement(self, name, attrs): if name == "resul...