CMS 3D CMS Logo

cmsHarvester.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 ###########################################################################
4 ## File : cmsHarvest.py
5 ## Authors : Jeroen Hegeman (jeroen.hegeman@cern.ch)
6 ## Niklas Pietsch (niklas.pietsch@desy.de)
7 ## Franseco Costanza (francesco.costanza@desy.de)
8 ## Last change: 20100308
9 
14 
15 """Main program to run all kinds of harvesting.
16 
17 These are the basic kinds of harvesting implemented (contact me if
18 your favourite is missing):
19 
20 - RelVal : Run for release validation samples. Makes heavy use of MC
21  truth information.
22 
23 - RelValFS: FastSim RelVal.
24 
25 - MC : Run for MC samples.
26 
27 - DQMOffline : Run for real data (could also be run for MC).
28 
29 For the mappings of these harvesting types to sequence names please
30 see the setup_harvesting_info() and option_handler_list_types()
31 methods.
32 
33 """
34 from __future__ import print_function
35 
36 ###########################################################################
37 
38 __version__ = "3.8.2p1" # (version jump to match release)
39 __author__ = "Jeroen Hegeman (jeroen.hegeman@cern.ch)," \
40  "Niklas Pietsch (niklas.pietsch@desy.de)"
41 
42 twiki_url = "https://twiki.cern.ch/twiki/bin/view/CMS/CmsHarvester"
43 
44 ###########################################################################
45 
46 ###########################################################################
47 ## TODO list
48 
89 
90 import os
91 import sys
92 import commands
93 import re
94 import logging
95 import optparse
96 import datetime
97 import copy
98 from inspect import getargspec
99 from random import choice
100 
101 import six
102 
103 # These we need to communicate with DBS global DBSAPI
104 from DBSAPI.dbsApi import DbsApi
105 import DBSAPI.dbsException
107 from functools import reduce
108 # and these we need to parse the DBS output.
109 global xml
110 global SAXParseException
111 import xml.sax
112 from xml.sax import SAXParseException
113 
114 import Configuration.PyReleaseValidation
115 from Configuration.PyReleaseValidation.ConfigBuilder import \
116  ConfigBuilder, defaultOptions
117 # from Configuration.PyReleaseValidation.cmsDriverOptions import options, python_config_filename
118 
119 #import FWCore.ParameterSet.Config as cms
120 
121 # Debugging stuff.
122 import pdb
123 try:
124  import debug_hook
125 except ImportError:
126  pass
127 
128 ###########################################################################
129 ## Helper class: Usage exception.
130 ###########################################################################
132  def __init__(self, msg):
133  self.msg = msg
134  def __str__(self):
135  return repr(self.msg)
136 
137  # End of Usage.
138 
139 ###########################################################################
140 ## Helper class: Error exception.
141 ###########################################################################
143  def __init__(self, msg):
144  self.msg = msg
145  def __str__(self):
146  return repr(self.msg)
147 
148 ###########################################################################
149 ## Helper class: CMSHarvesterHelpFormatter.
150 ###########################################################################
151 
152 class CMSHarvesterHelpFormatter(optparse.IndentedHelpFormatter):
153  """Helper class to add some customised help output to cmsHarvester.
154 
155  We want to add some instructions, as well as a pointer to the CMS
156  Twiki.
157 
158  """
159 
160  def format_usage(self, usage):
161 
162  usage_lines = []
163 
164  sep_line = "-" * 60
165  usage_lines.append(sep_line)
166  usage_lines.append("Welcome to the CMS harvester, a (hopefully useful)")
167  usage_lines.append("tool to create harvesting configurations.")
168  usage_lines.append("For more information please have a look at the CMS Twiki:")
169  usage_lines.append(" %s" % twiki_url)
170  usage_lines.append(sep_line)
171  usage_lines.append("")
172 
173  # Since we only add to the output, we now just append the
174  # original output from IndentedHelpFormatter.
175  usage_lines.append(optparse.IndentedHelpFormatter. \
176  format_usage(self, usage))
177 
178  formatted_usage = "\n".join(usage_lines)
179  return formatted_usage
180 
181  # End of CMSHarvesterHelpFormatter.
182 
183 ###########################################################################
184 ## Helper class: DBSXMLHandler.
185 ###########################################################################
186 
187 class DBSXMLHandler(xml.sax.handler.ContentHandler):
188  """XML handler class to parse DBS results.
189 
190  The tricky thing here is that older DBS versions (2.0.5 and
191  earlier) return results in a different XML format than newer
192  versions. Previously the result values were returned as attributes
193  to the `result' element. The new approach returns result values as
194  contents of named elements.
195 
196  The old approach is handled directly in startElement(), the new
197  approach in characters().
198 
199  NOTE: All results are returned in the form of string values of
200  course!
201 
202  """
203 
204  # This is the required mapping from the name of the variable we
205  # ask for to what we call it ourselves. (Effectively this is the
206  # mapping between the old attribute key name and the new element
207  # name.)
208  mapping = {
209  "dataset" : "PATH",
210  "dataset.tag" : "PROCESSEDDATASET_GLOBALTAG",
211  "datatype.type" : "PRIMARYDSTYPE_TYPE",
212  "run" : "RUNS_RUNNUMBER",
213  "run.number" : "RUNS_RUNNUMBER",
214  "file.name" : "FILES_LOGICALFILENAME",
215  "file.numevents" : "FILES_NUMBEROFEVENTS",
216  "algo.version" : "APPVERSION_VERSION",
217  "site" : "STORAGEELEMENT_SENAME",
218  }
219 
220  def __init__(self, tag_names):
221  # This is a list used as stack to keep track of where we are
222  # in the element tree.
224  self.tag_names = tag_names
225  self.results = {}
226 
227  def startElement(self, name, attrs):
228  self.element_position.append(name)
229 
230  self.current_value = []
231 
232  #----------
233 
234  # This is to catch results from DBS 2.0.5 and earlier.
235  if name == "result":
236  for name in self.tag_names:
237  key = DBSXMLHandler.mapping[name]
238  value = str(attrs[key])
239  try:
240  self.results[name].append(value)
241  except KeyError:
242  self.results[name] = [value]
243 
244  #----------
245 
246  def endElement(self, name):
247  assert self.current_element() == name, \
248  "closing unopenend element `%s'" % name
249 
250  if self.current_element() in self.tag_names:
251  contents = "".join(self.current_value)
252  if self.current_element() in self.results:
253  self.results[self.current_element()].append(contents)
254  else:
255  self.results[self.current_element()] = [contents]
256 
257  self.element_position.pop()
258 
259  def characters(self, content):
260  # NOTE: It is possible that the actual contents of the tag
261  # gets split into multiple pieces. This method will be called
262  # for each of the pieces. This means we have to concatenate
263  # everything ourselves.
264  if self.current_element() in self.tag_names:
265  self.current_value.append(content)
266 
267  def current_element(self):
268  return self.element_position[-1]
269 
271  """Make sure that all results arrays have equal length.
272 
273  We should have received complete rows from DBS. I.e. all
274  results arrays in the handler should be of equal length.
275 
276  """
277 
278  results_valid = True
279 
280  res_names = self.results.keys()
281  if len(res_names) > 1:
282  for res_name in res_names[1:]:
283  res_tmp = self.results[res_name]
284  if len(res_tmp) != len(self.results[res_names[0]]):
285  results_valid = False
286 
287  return results_valid
288 
289  # End of DBSXMLHandler.
290 
291 ###########################################################################
292 ## CMSHarvester class.
293 ###########################################################################
294 
296  """Class to perform CMS harvesting.
297 
298  More documentation `obviously' to follow.
299 
300  """
301 
302  ##########
303 
304  def __init__(self, cmd_line_opts=None):
305  "Initialize class and process command line options."
306 
307  self.version = __version__
308 
309  # These are the harvesting types allowed. See the head of this
310  # file for more information.
312  "RelVal",
313  "RelValFS",
314  "MC",
315  "DQMOffline",
316  ]
317 
318  # These are the possible harvesting modes:
319  # - Single-step: harvesting takes place on-site in a single
320  # step. For each samples only a single ROOT file containing
321  # the harvesting results is returned.
322  # - Single-step-allow-partial: special hack to allow
323  # harvesting of partial statistics using single-step
324  # harvesting on spread-out samples.
325  # - Two-step: harvesting takes place in two steps. The first
326  # step returns a series of monitoring elenent summaries for
327  # each sample. The second step then merges these summaries
328  # locally and does the real harvesting. This second step
329  # produces the ROOT file containing the harvesting results.
331  "single-step",
332  "single-step-allow-partial",
333  "two-step"
334  ]
335 
336  # It is possible to specify a GlobalTag that will override any
337  # choices (regarding GlobalTags) made by the cmsHarvester.
338  # BUG BUG BUG
339  # For the moment, until I figure out a way to obtain the
340  # GlobalTag with which a given data (!) dataset was created,
341  # it is necessary to specify a GlobalTag for harvesting of
342  # data.
343  # BUG BUG BUG end
344  self.globaltag = None
345 
346  # It's also possible to switch off the use of reference
347  # histograms altogether.
348  self.use_ref_hists = True
349 
350  # The database name and account are hard-coded. They are not
351  # likely to change before the end-of-life of this tool. But of
352  # course there is a way to override this from the command
353  # line. One can even override the Frontier connection used for
354  # the GlobalTag and for the reference histograms
355  # independently. Please only use this for testing purposes.
357  self.frontier_connection_name["globaltag"] = "frontier://" \
358  "FrontierProd/"
359  self.frontier_connection_name["refhists"] = "frontier://" \
360  "FrontierProd/"
362  for key in self.frontier_connection_name.keys():
363  self.frontier_connection_overridden[key] = False
364 
365  # This contains information specific to each of the harvesting
366  # types. Used to create the harvesting configuration. It is
367  # filled by setup_harvesting_info().
368  self.harvesting_info = None
369 
370  ###
371 
372  # These are default `unused' values that will be filled in
373  # depending on the command line options.
374 
375  # The type of harvesting we're doing. See
376  # self.harvesting_types for allowed types.
377  self.harvesting_type = None
378 
379  # The harvesting mode, popularly known as single-step
380  # vs. two-step. The thing to remember at this point is that
381  # single-step is only possible for samples located completely
382  # at a single site (i.e. SE).
383  self.harvesting_mode = None
384  # BUG BUG BUG
385  # Default temporarily set to two-step until we can get staged
386  # jobs working with CRAB.
387  self.harvesting_mode_default = "single-step"
388  # BUG BUG BUG end
389 
390  # The input method: are we reading a dataset name (or regexp)
391  # directly from the command line or are we reading a file
392  # containing a list of dataset specifications. Actually we
393  # keep one of each for both datasets and runs.
394  self.input_method = {}
395  self.input_method["datasets"] = {}
396  self.input_method["datasets"]["use"] = None
397  self.input_method["datasets"]["ignore"] = None
398  self.input_method["runs"] = {}
399  self.input_method["runs"]["use"] = None
400  self.input_method["runs"]["ignore"] = None
401  self.input_method["runs"]["ignore"] = None
402  # The name of whatever input we're using.
403  self.input_name = {}
404  self.input_name["datasets"] = {}
405  self.input_name["datasets"]["use"] = None
406  self.input_name["datasets"]["ignore"] = None
407  self.input_name["runs"] = {}
408  self.input_name["runs"]["use"] = None
409  self.input_name["runs"]["ignore"] = None
410 
411  self.Jsonlumi = False
412  self.Jsonfilename = "YourJSON.txt"
413  self.Jsonrunfilename = "YourJSON.txt"
414  self.todofile = "YourToDofile.txt"
415 
416  # If this is true, we're running in `force mode'. In this case
417  # the sanity checks are performed but failure will not halt
418  # everything.
419  self.force_running = None
420 
421  # The base path of the output dir in CASTOR.
422  self.castor_base_dir = None
423  self.castor_base_dir_default = "/castor/cern.ch/" \
424  "cms/store/temp/" \
425  "dqm/offline/harvesting_output/"
426 
427  # The name of the file to be used for book keeping: which
428  # datasets, runs, etc. we have already processed.
430  self.book_keeping_file_name_default = "harvesting_accounting.txt"
431 
432  # The dataset name to reference histogram name mapping is read
433  # from a text file. The name of this file is kept in the
434  # following variable.
436  # And this is the default value.
437  self.ref_hist_mappings_file_name_default = "harvesting_ref_hist_mappings.txt"
438 
439  # Hmmm, hard-coded prefix of the CERN CASTOR area. This is the
440  # only supported CASTOR area.
441  # NOTE: Make sure this one starts with a `/'.
442  self.castor_prefix = "/castor/cern.ch"
443 
444  # Normally the central harvesting should be done using the
445  # `t1access' grid role. To be able to run without T1 access
446  # the --no-t1access flag can be used. This variable keeps
447  # track of that special mode.
448  self.non_t1access = False
449  self.caf_access = False
450  self.saveByLumiSection = False
451  self.crab_submission = False
452  self.nr_max_sites = 1
453 
454  self.preferred_site = "no preference"
455 
456  # This will become the list of datasets and runs to consider
457  self.datasets_to_use = {}
458  # and this will become the list of datasets and runs to skip.
460  # This, in turn, will hold all book keeping information.
462  # And this is where the dataset name to reference histogram
463  # name mapping is stored.
465 
466  # We're now also allowing run selection. This means we also
467  # have to keep list of runs requested and vetoed by the user.
468  self.runs_to_use = {}
469  self.runs_to_ignore = {}
470 
471  # Cache for CMSSW version availability at different sites.
473 
474  # Cache for checked GlobalTags.
476 
477  # Global flag to see if there were any jobs for which we could
478  # not find a matching site.
479  self.all_sites_found = True
480 
481  # Helper string centrally defined.
482  self.no_matching_site_found_str = "no_matching_site_found"
483 
484  # Store command line options for later use.
485  if cmd_line_opts is None:
486  cmd_line_opts = sys.argv[1:]
487  self.cmd_line_opts = cmd_line_opts
488 
489  # Set up the logger.
490  log_handler = logging.StreamHandler()
491  # This is the default log formatter, the debug option switches
492  # on some more information.
493  log_formatter = logging.Formatter("%(message)s")
494  log_handler.setFormatter(log_formatter)
495  logger = logging.getLogger()
496  logger.name = "main"
497  logger.addHandler(log_handler)
498  self.logger = logger
499  # The default output mode is quite verbose.
500  self.set_output_level("NORMAL")
501 
502  #logger.debug("Initialized successfully")
503 
504  # End of __init__.
505 
506  ##########
507 
508  def cleanup(self):
509  "Clean up after ourselves."
510 
511  # NOTE: This is the safe replacement of __del__.
512 
513  #self.logger.debug("All done -> shutting down")
514  logging.shutdown()
515 
516  # End of cleanup.
517 
518  ##########
519 
520  def time_stamp(self):
521  "Create a timestamp to use in the created config files."
522 
523  time_now = datetime.datetime.utcnow()
524  # We don't care about the microseconds.
525  time_now = time_now.replace(microsecond = 0)
526  time_stamp = "%sUTC" % datetime.datetime.isoformat(time_now)
527 
528  # End of time_stamp.
529  return time_stamp
530 
531  ##########
532 
533  def ident_string(self):
534  "Spit out an identification string for cmsHarvester.py."
535 
536  ident_str = "`cmsHarvester.py " \
537  "version %s': cmsHarvester.py %s" % \
538  (__version__,
539  reduce(lambda x, y: x+' '+y, sys.argv[1:]))
540 
541  return ident_str
542 
543  ##########
544 
545  def format_conditions_string(self, globaltag):
546  """Create the conditions string needed for `cmsDriver'.
547 
548  Just glueing the FrontierConditions bit in front of it really.
549 
550  """
551 
552  # Not very robust but okay. The idea is that if the user
553  # specified (since this cannot happen with GlobalTags coming
554  # from DBS) something containing `conditions', they probably
555  # know what they're doing and we should not muck things up. In
556  # all other cases we just assume we only received the
557  # GlobalTag part and we built the usual conditions string from
558  # that .
559  if globaltag.lower().find("conditions") > -1:
560  conditions_string = globaltag
561  else:
562  conditions_string = "FrontierConditions_GlobalTag,%s" % \
563  globaltag
564 
565  # End of format_conditions_string.
566  return conditions_string
567 
568  ##########
569 
571  """Return the database account name used to store the GlobalTag.
572 
573  The name of the database account depends (albeit weakly) on
574  the CMSSW release version.
575 
576  """
577 
578  # This never changed, unlike the cms_cond_31X_DQM_SUMMARY ->
579  # cms_cond_34X_DQM transition.
580  account_name = "CMS_COND_31X_GLOBALTAG"
581 
582  # End of db_account_name_cms_cond_globaltag.
583  return account_name
584 
585  ##########
586 
588  """See db_account_name_cms_cond_globaltag."""
589 
590  account_name = None
591  version = self.cmssw_version[6:11]
592  if version < "3_4_0":
593  account_name = "CMS_COND_31X_DQM_SUMMARY"
594  else:
595  account_name = "CMS_COND_34X"
596 
597  # End of db_account_name_cms_cond_dqm_summary.
598  return account_name
599 
600  ##########
601 
603  "Create a nice header to be used to mark the generated files."
604 
605  tmp = []
606 
607  time_stamp = self.time_stamp()
608  ident_str = self.ident_string()
609  tmp.append("# %s" % time_stamp)
610  tmp.append("# WARNING: This file was created automatically!")
611  tmp.append("")
612  tmp.append("# Created by %s" % ident_str)
613 
614  header = "\n".join(tmp)
615 
616  # End of config_file_header.
617  return header
618 
619  ##########
620 
621  def set_output_level(self, output_level):
622  """Adjust the level of output generated.
623 
624  Choices are:
625  - normal : default level of output
626  - quiet : less output than the default
627  - verbose : some additional information
628  - debug : lots more information, may be overwhelming
629 
630  NOTE: The debug option is a bit special in the sense that it
631  also modifies the output format.
632 
633  """
634 
635  # NOTE: These levels are hooked up to the ones used in the
636  # logging module.
637  output_levels = {
638  "NORMAL" : logging.INFO,
639  "QUIET" : logging.WARNING,
640  "VERBOSE" : logging.INFO,
641  "DEBUG" : logging.DEBUG
642  }
643 
644  output_level = output_level.upper()
645 
646  try:
647  # Update the logger.
648  self.log_level = output_levels[output_level]
649  self.logger.setLevel(self.log_level)
650  except KeyError:
651  # Show a complaint
652  self.logger.fatal("Unknown output level `%s'" % ouput_level)
653  # and re-raise an exception.
654  raise Exception
655 
656  # End of set_output_level.
657 
658  ##########
659 
660  def option_handler_debug(self, option, opt_str, value, parser):
661  """Switch to debug mode.
662 
663  This both increases the amount of output generated, as well as
664  changes the format used (more detailed information is given).
665 
666  """
667 
668  # Switch to a more informative log formatter for debugging.
669  log_formatter_debug = logging.Formatter("[%(levelname)s] " \
670  # NOTE: funcName was
671  # only implemented
672  # starting with python
673  # 2.5.
674  #"%(funcName)s() " \
675  #"@%(filename)s:%(lineno)d " \
676  "%(message)s")
677  # Hmmm, not very nice. This assumes there's only a single
678  # handler associated with the current logger.
679  log_handler = self.logger.handlers[0]
680  log_handler.setFormatter(log_formatter_debug)
681  self.set_output_level("DEBUG")
682 
683  # End of option_handler_debug.
684 
685  ##########
686 
687  def option_handler_quiet(self, option, opt_str, value, parser):
688  "Switch to quiet mode: less verbose."
689 
690  self.set_output_level("QUIET")
691 
692  # End of option_handler_quiet.
693 
694  ##########
695 
696  def option_handler_force(self, option, opt_str, value, parser):
697  """Switch on `force mode' in which case we don't brake for nobody.
698 
699  In so-called `force mode' all sanity checks are performed but
700  we don't halt on failure. Of course this requires some care
701  from the user.
702 
703  """
704 
705  self.logger.debug("Switching on `force mode'.")
706  self.force_running = True
707 
708  # End of option_handler_force.
709 
710  ##########
711 
712  def option_handler_harvesting_type(self, option, opt_str, value, parser):
713  """Set the harvesting type to be used.
714 
715  This checks that no harvesting type is already set, and sets
716  the harvesting type to be used to the one specified. If a
717  harvesting type is already set an exception is thrown. The
718  same happens when an unknown type is specified.
719 
720  """
721 
722  # Check for (in)valid harvesting types.
723  # NOTE: The matching is done in a bit of a complicated
724  # way. This allows the specification of the type to be
725  # case-insensitive while still ending up with the properly
726  # `cased' version afterwards.
727  value = value.lower()
728  harvesting_types_lowered = [i.lower() for i in self.harvesting_types]
729  try:
730  type_index = harvesting_types_lowered.index(value)
731  # If this works, we now have the index to the `properly
732  # cased' version of the harvesting type.
733  except ValueError:
734  self.logger.fatal("Unknown harvesting type `%s'" % \
735  value)
736  self.logger.fatal(" possible types are: %s" %
737  ", ".join(self.harvesting_types))
738  raise Usage("Unknown harvesting type `%s'" % \
739  value)
740 
741  # Check if multiple (by definition conflicting) harvesting
742  # types are being specified.
743  if not self.harvesting_type is None:
744  msg = "Only one harvesting type should be specified"
745  self.logger.fatal(msg)
746  raise Usage(msg)
747  self.harvesting_type = self.harvesting_types[type_index]
748 
749  self.logger.info("Harvesting type to be used: `%s'" % \
750  self.harvesting_type)
751 
752  # End of option_handler_harvesting_type.
753 
754  ##########
755 
756  def option_handler_harvesting_mode(self, option, opt_str, value, parser):
757  """Set the harvesting mode to be used.
758 
759  Single-step harvesting can be used for samples that are
760  located completely at a single site (= SE). Otherwise use
761  two-step mode.
762 
763  """
764 
765  # Check for valid mode.
766  harvesting_mode = value.lower()
767  if not harvesting_mode in self.harvesting_modes:
768  msg = "Unknown harvesting mode `%s'" % harvesting_mode
769  self.logger.fatal(msg)
770  self.logger.fatal(" possible modes are: %s" % \
771  ", ".join(self.harvesting_modes))
772  raise Usage(msg)
773 
774  # Check if we've been given only a single mode, otherwise
775  # complain.
776  if not self.harvesting_mode is None:
777  msg = "Only one harvesting mode should be specified"
778  self.logger.fatal(msg)
779  raise Usage(msg)
780  self.harvesting_mode = harvesting_mode
781 
782  self.logger.info("Harvesting mode to be used: `%s'" % \
783  self.harvesting_mode)
784 
785  # End of option_handler_harvesting_mode.
786 
787  ##########
788 
789  def option_handler_globaltag(self, option, opt_str, value, parser):
790  """Set the GlobalTag to be used, overriding our own choices.
791 
792  By default the cmsHarvester will use the GlobalTag with which
793  a given dataset was created also for the harvesting. The
794  --globaltag option is the way to override this behaviour.
795 
796  """
797 
798  # Make sure that this flag only occurred once.
799  if not self.globaltag is None:
800  msg = "Only one GlobalTag should be specified"
801  self.logger.fatal(msg)
802  raise Usage(msg)
803  self.globaltag = value
804 
805  self.logger.info("GlobalTag to be used: `%s'" % \
806  self.globaltag)
807 
808  # End of option_handler_globaltag.
809 
810  ##########
811 
812  def option_handler_no_ref_hists(self, option, opt_str, value, parser):
813  "Switch use of all reference histograms off."
814 
815  self.use_ref_hists = False
816 
817  self.logger.warning("Switching off all use of reference histograms")
818 
819  # End of option_handler_no_ref_hists.
820 
821  ##########
822 
823  def option_handler_frontier_connection(self, option, opt_str,
824  value, parser):
825  """Override the default Frontier connection string.
826 
827  Please only use this for testing (e.g. when a test payload has
828  been inserted into cms_orc_off instead of cms_orc_on).
829 
830  This method gets called for three different command line
831  options:
832  - --frontier-connection,
833  - --frontier-connection-for-globaltag,
834  - --frontier-connection-for-refhists.
835  Appropriate care has to be taken to make sure things are only
836  specified once.
837 
838  """
839 
840  # Figure out with which command line option we've been called.
841  frontier_type = opt_str.split("-")[-1]
842  if frontier_type == "connection":
843  # Main option: change all connection strings.
844  frontier_types = self.frontier_connection_name.keys()
845  else:
846  frontier_types = [frontier_type]
847 
848  # Make sure that each Frontier connection is specified only
849  # once. (Okay, in a bit of a dodgy way...)
850  for connection_name in frontier_types:
851  if self.frontier_connection_overridden[connection_name] == True:
852  msg = "Please specify either:\n" \
853  " `--frontier-connection' to change the " \
854  "Frontier connection used for everything, or\n" \
855  "either one or both of\n" \
856  " `--frontier-connection-for-globaltag' to " \
857  "change the Frontier connection used for the " \
858  "GlobalTag and\n" \
859  " `--frontier-connection-for-refhists' to change " \
860  "the Frontier connection used for the " \
861  "reference histograms."
862  self.logger.fatal(msg)
863  raise Usage(msg)
864 
865  frontier_prefix = "frontier://"
866  if not value.startswith(frontier_prefix):
867  msg = "Expecting Frontier connections to start with " \
868  "`%s'. You specified `%s'." % \
869  (frontier_prefix, value)
870  self.logger.fatal(msg)
871  raise Usage(msg)
872  # We also kind of expect this to be either FrontierPrep or
873  # FrontierProd (but this is just a warning).
874  if value.find("FrontierProd") < 0 and \
875  value.find("FrontierProd") < 0:
876  msg = "Expecting Frontier connections to contain either " \
877  "`FrontierProd' or `FrontierPrep'. You specified " \
878  "`%s'. Are you sure?" % \
879  value
880  self.logger.warning(msg)
881 
882  if not value.endswith("/"):
883  value += "/"
884 
885  for connection_name in frontier_types:
886  self.frontier_connection_name[connection_name] = value
887  self.frontier_connection_overridden[connection_name] = True
888 
889  frontier_type_str = "unknown"
890  if connection_name == "globaltag":
891  frontier_type_str = "the GlobalTag"
892  elif connection_name == "refhists":
893  frontier_type_str = "the reference histograms"
894 
895  self.logger.warning("Overriding default Frontier " \
896  "connection for %s " \
897  "with `%s'" % \
898  (frontier_type_str,
899  self.frontier_connection_name[connection_name]))
900 
901  # End of option_handler_frontier_connection
902 
903  ##########
904 
905  def option_handler_input_todofile(self, option, opt_str, value, parser):
906 
907  self.todofile = value
908  # End of option_handler_input_todofile.
909 
910  ##########
911 
912  def option_handler_input_Jsonfile(self, option, opt_str, value, parser):
913 
914  self.Jsonfilename = value
915  # End of option_handler_input_Jsonfile.
916 
917  ##########
918 
919  def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser):
920 
921  self.Jsonrunfilename = value
922  # End of option_handler_input_Jsonrunfile.
923 
924  ##########
925 
926  def option_handler_input_spec(self, option, opt_str, value, parser):
927  """TODO TODO TODO
928  Document this...
929 
930  """
931 
932  # Figure out if we were called for the `use these' or the
933  # `ignore these' case.
934  if opt_str.lower().find("ignore") > -1:
935  spec_type = "ignore"
936  else:
937  spec_type = "use"
938 
939  # Similar: are we being called for datasets or for runs?
940  if opt_str.lower().find("dataset") > -1:
941  select_type = "datasets"
942  else:
943  select_type = "runs"
944 
945  if not self.input_method[select_type][spec_type] is None:
946  msg = "Please only specify one input method " \
947  "(for the `%s' case)" % opt_str
948  self.logger.fatal(msg)
949  raise Usage(msg)
950 
951  input_method = opt_str.replace("-", "").replace("ignore", "")
952  self.input_method[select_type][spec_type] = input_method
953  self.input_name[select_type][spec_type] = value
954 
955  self.logger.debug("Input method for the `%s' case: %s" % \
956  (spec_type, input_method))
957 
958  # End of option_handler_input_spec
959 
960  ##########
961 
962  def option_handler_book_keeping_file(self, option, opt_str, value, parser):
963  """Store the name of the file to be used for book keeping.
964 
965  The only check done here is that only a single book keeping
966  file is specified.
967 
968  """
969 
970  file_name = value
971 
972  if not self.book_keeping_file_name is None:
973  msg = "Only one book keeping file should be specified"
974  self.logger.fatal(msg)
975  raise Usage(msg)
976  self.book_keeping_file_name = file_name
977 
978  self.logger.info("Book keeping file to be used: `%s'" % \
980 
981  # End of option_handler_book_keeping_file.
982 
983  ##########
984 
985  def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser):
986  """Store the name of the file for the ref. histogram mapping.
987 
988  """
989 
990  file_name = value
991 
992  if not self.ref_hist_mappings_file_name is None:
993  msg = "Only one reference histogram mapping file " \
994  "should be specified"
995  self.logger.fatal(msg)
996  raise Usage(msg)
997  self.ref_hist_mappings_file_name = file_name
998 
999  self.logger.info("Reference histogram mapping file " \
1000  "to be used: `%s'" % \
1002 
1003  # End of option_handler_ref_hist_mapping_file.
1004 
1005  ##########
1006 
1007  # OBSOLETE OBSOLETE OBSOLETE
1008 
1009 ## def option_handler_dataset_name(self, option, opt_str, value, parser):
1010 ## """Specify the name(s) of the dataset(s) to be processed.
1011 
1012 ## It is checked to make sure that no dataset name or listfile
1013 ## names are given yet. If all is well (i.e. we still have a
1014 ## clean slate) the dataset name is stored for later use,
1015 ## otherwise a Usage exception is raised.
1016 
1017 ## """
1018 
1019 ## if not self.input_method is None:
1020 ## if self.input_method == "dataset":
1021 ## raise Usage("Please only feed me one dataset specification")
1022 ## elif self.input_method == "listfile":
1023 ## raise Usage("Cannot specify both dataset and input list file")
1024 ## else:
1025 ## assert False, "Unknown input method `%s'" % self.input_method
1026 ## self.input_method = "dataset"
1027 ## self.input_name = value
1028 ## self.logger.info("Input method used: %s" % self.input_method)
1029 
1030 ## # End of option_handler_dataset_name.
1031 
1032 ## ##########
1033 
1034 ## def option_handler_listfile_name(self, option, opt_str, value, parser):
1035 ## """Specify the input list file containing datasets to be processed.
1036 
1037 ## It is checked to make sure that no dataset name or listfile
1038 ## names are given yet. If all is well (i.e. we still have a
1039 ## clean slate) the listfile name is stored for later use,
1040 ## otherwise a Usage exception is raised.
1041 
1042 ## """
1043 
1044 ## if not self.input_method is None:
1045 ## if self.input_method == "listfile":
1046 ## raise Usage("Please only feed me one list file")
1047 ## elif self.input_method == "dataset":
1048 ## raise Usage("Cannot specify both dataset and input list file")
1049 ## else:
1050 ## assert False, "Unknown input method `%s'" % self.input_method
1051 ## self.input_method = "listfile"
1052 ## self.input_name = value
1053 ## self.logger.info("Input method used: %s" % self.input_method)
1054 
1055 ## # End of option_handler_listfile_name.
1056 
1057  # OBSOLETE OBSOLETE OBSOLETE end
1058 
1059  ##########
1060 
1061  def option_handler_castor_dir(self, option, opt_str, value, parser):
1062  """Specify where on CASTOR the output should go.
1063 
1064  At the moment only output to CERN CASTOR is
1065  supported. Eventually the harvested results should go into the
1066  central place for DQM on CASTOR anyway.
1067 
1068  """
1069 
1070  # Check format of specified CASTOR area.
1071  castor_dir = value
1072  #castor_dir = castor_dir.lstrip(os.path.sep)
1073  castor_prefix = self.castor_prefix
1074 
1075  # Add a leading slash if necessary and clean up the path.
1076  castor_dir = os.path.join(os.path.sep, castor_dir)
1077  self.castor_base_dir = os.path.normpath(castor_dir)
1078 
1079  self.logger.info("CASTOR (base) area to be used: `%s'" % \
1080  self.castor_base_dir)
1081 
1082  # End of option_handler_castor_dir.
1083 
1084  ##########
1085 
1086  def option_handler_no_t1access(self, option, opt_str, value, parser):
1087  """Set the self.no_t1access flag to try and create jobs that
1088  run without special `t1access' role.
1089 
1090  """
1091 
1092  self.non_t1access = True
1093 
1094  self.logger.warning("Running in `non-t1access' mode. " \
1095  "Will try to create jobs that run " \
1096  "without special rights but no " \
1097  "further promises...")
1098 
1099  # End of option_handler_no_t1access.
1100 
1101  ##########
1102 
1103  def option_handler_caf_access(self, option, opt_str, value, parser):
1104  """Set the self.caf_access flag to try and create jobs that
1105  run on the CAF.
1106 
1107  """
1108  self.caf_access = True
1109 
1110  self.logger.warning("Running in `caf_access' mode. " \
1111  "Will try to create jobs that run " \
1112  "on CAF but no" \
1113  "further promises...")
1114 
1115  # End of option_handler_caf_access.
1116 
1117  ##########
1118 
1119  def option_handler_saveByLumiSection(self, option, opt_str, value, parser):
1120  """Set process.dqmSaver.saveByLumiSectiont=1 in cfg harvesting file
1121  """
1122  self.saveByLumiSection = True
1123 
1124  self.logger.warning("waning concerning saveByLumiSection option")
1125 
1126  # End of option_handler_saveByLumiSection.
1127 
1128 
1129  ##########
1130 
1131  def option_handler_crab_submission(self, option, opt_str, value, parser):
1132  """Crab jobs are not created and
1133  "submitted automatically",
1134  """
1135  self.crab_submission = True
1136 
1137  # End of option_handler_crab_submission.
1138 
1139  ##########
1140 
1141  def option_handler_sites(self, option, opt_str, value, parser):
1142 
1143  self.nr_max_sites = value
1144 
1145  ##########
1146 
1147  def option_handler_preferred_site(self, option, opt_str, value, parser):
1148 
1149  self.preferred_site = value
1150 
1151  ##########
1152 
1153  def option_handler_list_types(self, option, opt_str, value, parser):
1154  """List all harvesting types and their mappings.
1155 
1156  This lists all implemented harvesting types with their
1157  corresponding mappings to sequence names. This had to be
1158  separated out from the help since it depends on the CMSSW
1159  version and was making things a bit of a mess.
1160 
1161  NOTE: There is no way (at least not that I could come up with)
1162  to code this in a neat generic way that can be read both by
1163  this method and by setup_harvesting_info(). Please try hard to
1164  keep these two methods in sync!
1165 
1166  """
1167 
1168  sep_line = "-" * 50
1169  sep_line_short = "-" * 20
1170 
1171  print(sep_line)
1172  print("The following harvesting types are available:")
1173  print(sep_line)
1174 
1175  print("`RelVal' maps to:")
1176  print(" pre-3_3_0 : HARVESTING:validationHarvesting")
1177  print(" 3_4_0_pre2 and later: HARVESTING:validationHarvesting+dqmHarvesting")
1178  print(" Exceptions:")
1179  print(" 3_3_0_pre1-4 : HARVESTING:validationHarvesting")
1180  print(" 3_3_0_pre6 : HARVESTING:validationHarvesting")
1181  print(" 3_4_0_pre1 : HARVESTING:validationHarvesting")
1182 
1183  print(sep_line_short)
1184 
1185  print("`RelValFS' maps to:")
1186  print(" always : HARVESTING:validationHarvestingFS")
1187 
1188  print(sep_line_short)
1189 
1190  print("`MC' maps to:")
1191  print(" always : HARVESTING:validationprodHarvesting")
1192 
1193  print(sep_line_short)
1194 
1195  print("`DQMOffline' maps to:")
1196  print(" always : HARVESTING:dqmHarvesting")
1197 
1198  print(sep_line)
1199 
1200  # We're done, let's quit. (This is the same thing optparse
1201  # does after printing the help.)
1202  raise SystemExit
1203 
1204  # End of option_handler_list_types.
1205 
1206  ##########
1207 
1209  """Fill our dictionary with all info needed to understand
1210  harvesting.
1211 
1212  This depends on the CMSSW version since at some point the
1213  names and sequences were modified.
1214 
1215  NOTE: There is no way (at least not that I could come up with)
1216  to code this in a neat generic way that can be read both by
1217  this method and by option_handler_list_types(). Please try
1218  hard to keep these two methods in sync!
1219 
1220  """
1221 
1222  assert not self.cmssw_version is None, \
1223  "ERROR setup_harvesting() requires " \
1224  "self.cmssw_version to be set!!!"
1225 
1226  harvesting_info = {}
1227 
1228  # This is the version-independent part.
1229  harvesting_info["DQMOffline"] = {}
1230  harvesting_info["DQMOffline"]["beamspot"] = None
1231  harvesting_info["DQMOffline"]["eventcontent"] = None
1232  harvesting_info["DQMOffline"]["harvesting"] = "AtRunEnd"
1233 
1234  harvesting_info["RelVal"] = {}
1235  harvesting_info["RelVal"]["beamspot"] = None
1236  harvesting_info["RelVal"]["eventcontent"] = None
1237  harvesting_info["RelVal"]["harvesting"] = "AtRunEnd"
1238 
1239  harvesting_info["RelValFS"] = {}
1240  harvesting_info["RelValFS"]["beamspot"] = None
1241  harvesting_info["RelValFS"]["eventcontent"] = None
1242  harvesting_info["RelValFS"]["harvesting"] = "AtRunEnd"
1243 
1244  harvesting_info["MC"] = {}
1245  harvesting_info["MC"]["beamspot"] = None
1246  harvesting_info["MC"]["eventcontent"] = None
1247  harvesting_info["MC"]["harvesting"] = "AtRunEnd"
1248 
1249  # This is the version-dependent part. And I know, strictly
1250  # speaking it's not necessary to fill in all three types since
1251  # in a single run we'll only use one type anyway. This does
1252  # look more readable, however, and required less thought from
1253  # my side when I put this together.
1254 
1255  # DEBUG DEBUG DEBUG
1256  # Check that we understand our own version naming.
1257  assert self.cmssw_version.startswith("CMSSW_")
1258  # DEBUG DEBUG DEBUG end
1259 
1260  version = self.cmssw_version[6:]
1261 
1262  #----------
1263 
1264  # RelVal
1265  step_string = None
1266  if version < "3_3_0":
1267  step_string = "validationHarvesting"
1268  elif version in ["3_3_0_pre1", "3_3_0_pre2",
1269  "3_3_0_pre3", "3_3_0_pre4",
1270  "3_3_0_pre6", "3_4_0_pre1"]:
1271  step_string = "validationHarvesting"
1272  else:
1273  step_string = "validationHarvesting+dqmHarvesting"
1274 
1275  harvesting_info["RelVal"]["step_string"] = step_string
1276 
1277  # DEBUG DEBUG DEBUG
1278  # Let's make sure we found something.
1279  assert not step_string is None, \
1280  "ERROR Could not decide a RelVal harvesting sequence " \
1281  "for CMSSW version %s" % self.cmssw_version
1282  # DEBUG DEBUG DEBUG end
1283 
1284  #----------
1285 
1286  # RelVal
1287  step_string = "validationHarvestingFS"
1288 
1289  harvesting_info["RelValFS"]["step_string"] = step_string
1290 
1291  #----------
1292 
1293  # MC
1294  step_string = "validationprodHarvesting"
1295 
1296  harvesting_info["MC"]["step_string"] = step_string
1297 
1298  # DEBUG DEBUG DEBUG
1299  # Let's make sure we found something.
1300  assert not step_string is None, \
1301  "ERROR Could not decide a MC harvesting " \
1302  "sequence for CMSSW version %s" % self.cmssw_version
1303  # DEBUG DEBUG DEBUG end
1304 
1305  #----------
1306 
1307  # DQMOffline
1308  step_string = "dqmHarvesting"
1309 
1310  harvesting_info["DQMOffline"]["step_string"] = step_string
1311 
1312  #----------
1313 
1314  self.harvesting_info = harvesting_info
1315 
1316  self.logger.info("Based on the CMSSW version (%s) " \
1317  "I decided to use the `HARVESTING:%s' " \
1318  "sequence for %s harvesting" % \
1319  (self.cmssw_version,
1320  self.harvesting_info[self.harvesting_type]["step_string"],
1321  self.harvesting_type))
1322 
1323  # End of setup_harvesting_info.
1324 
1325  ##########
1326 
1327  def create_castor_path_name_common(self, dataset_name):
1328  """Build the common part of the output path to be used on
1329  CASTOR.
1330 
1331  This consists of the CASTOR area base path specified by the
1332  user and a piece depending on the data type (data vs. MC), the
1333  harvesting type and the dataset name followed by a piece
1334  containing the run number and event count. (See comments in
1335  create_castor_path_name_special for details.) This method
1336  creates the common part, without run number and event count.
1337 
1338  """
1339 
1340  castor_path = self.castor_base_dir
1341 
1342  ###
1343 
1344  # The data type: data vs. mc.
1345  datatype = self.datasets_information[dataset_name]["datatype"]
1346  datatype = datatype.lower()
1347  castor_path = os.path.join(castor_path, datatype)
1348 
1349  # The harvesting type.
1350  harvesting_type = self.harvesting_type
1351  harvesting_type = harvesting_type.lower()
1352  castor_path = os.path.join(castor_path, harvesting_type)
1353 
1354  # The CMSSW release version (only the `digits'). Note that the
1355  # CMSSW version used here is the version used for harvesting,
1356  # not the one from the dataset. This does make the results
1357  # slightly harder to find. On the other hand it solves
1358  # problems in case one re-harvests a given dataset with a
1359  # different CMSSW version, which would lead to ambiguous path
1360  # names. (Of course for many cases the harvesting is done with
1361  # the same CMSSW version the dataset was created with.)
1362  release_version = self.cmssw_version
1363  release_version = release_version.lower(). \
1364  replace("cmssw", ""). \
1365  strip("_")
1366  castor_path = os.path.join(castor_path, release_version)
1367 
1368  # The dataset name.
1369  dataset_name_escaped = self.escape_dataset_name(dataset_name)
1370  castor_path = os.path.join(castor_path, dataset_name_escaped)
1371 
1372  ###
1373 
1374  castor_path = os.path.normpath(castor_path)
1375 
1376  # End of create_castor_path_name_common.
1377  return castor_path
1378 
1379  ##########
1380 
1382  dataset_name, run_number,
1383  castor_path_common):
1384  """Create the specialised part of the CASTOR output dir name.
1385 
1386  NOTE: To avoid clashes with `incremental harvesting'
1387  (re-harvesting when a dataset grows) we have to include the
1388  event count in the path name. The underlying `problem' is that
1389  CRAB does not overwrite existing output files so if the output
1390  file already exists CRAB will fail to copy back the output.
1391 
1392  NOTE: It's not possible to create different kinds of
1393  harvesting jobs in a single call to this tool. However, in
1394  principle it could be possible to create both data and MC jobs
1395  in a single go.
1396 
1397  NOTE: The number of events used in the path name is the
1398  _total_ number of events in the dataset/run at the time of
1399  harvesting. If we're doing partial harvesting the final
1400  results will reflect lower statistics. This is a) the easiest
1401  to code and b) the least likely to lead to confusion if
1402  someone ever decides to swap/copy around file blocks between
1403  sites.
1404 
1405  """
1406 
1407  castor_path = castor_path_common
1408 
1409  ###
1410 
1411  # The run number part.
1412  castor_path = os.path.join(castor_path, "run_%d" % run_number)
1413 
1414  ###
1415 
1416  # The event count (i.e. the number of events we currently see
1417  # for this dataset).
1418  #nevents = self.datasets_information[dataset_name] \
1419  # ["num_events"][run_number]
1420  castor_path = os.path.join(castor_path, "nevents")
1421 
1422  ###
1423 
1424  castor_path = os.path.normpath(castor_path)
1425 
1426  # End of create_castor_path_name_special.
1427  return castor_path
1428 
1429  ##########
1430 
1432  """Make sure all required CASTOR output dirs exist.
1433 
1434  This checks the CASTOR base dir specified by the user as well
1435  as all the subdirs required by the current set of jobs.
1436 
1437  """
1438 
1439  self.logger.info("Checking (and if necessary creating) CASTOR " \
1440  "output area(s)...")
1441 
1442  # Call the real checker method for the base dir.
1443  self.create_and_check_castor_dir(self.castor_base_dir)
1444 
1445  # Now call the checker for all (unique) subdirs.
1446  castor_dirs = []
1447  for (dataset_name, runs) in six.iteritems(self.datasets_to_use):
1448 
1449  for run in runs:
1450  castor_dirs.append(self.datasets_information[dataset_name] \
1451  ["castor_path"][run])
1452  castor_dirs_unique = sorted(set(castor_dirs))
1453  # This can take some time. E.g. CRAFT08 has > 300 runs, each
1454  # of which will get a new directory. So we show some (rough)
1455  # info in between.
1456  ndirs = len(castor_dirs_unique)
1457  step = max(ndirs / 10, 1)
1458  for (i, castor_dir) in enumerate(castor_dirs_unique):
1459  if (i + 1) % step == 0 or \
1460  (i + 1) == ndirs:
1461  self.logger.info(" %d/%d" % \
1462  (i + 1, ndirs))
1463  self.create_and_check_castor_dir(castor_dir)
1464 
1465  # Now check if the directory is empty. If (an old version
1466  # of) the output file already exists CRAB will run new
1467  # jobs but never copy the results back. We assume the user
1468  # knows what they are doing and only issue a warning in
1469  # case the directory is not empty.
1470  self.logger.debug("Checking if path `%s' is empty" % \
1471  castor_dir)
1472  cmd = "rfdir %s" % castor_dir
1473  (status, output) = commands.getstatusoutput(cmd)
1474  if status != 0:
1475  msg = "Could not access directory `%s'" \
1476  " !!! This is bad since I should have just" \
1477  " created it !!!" % castor_dir
1478  self.logger.fatal(msg)
1479  raise Error(msg)
1480  if len(output) > 0:
1481  self.logger.warning("Output directory `%s' is not empty:" \
1482  " new jobs will fail to" \
1483  " copy back output" % \
1484  castor_dir)
1485 
1486  # End of create_and_check_castor_dirs.
1487 
1488  ##########
1489 
1490  def create_and_check_castor_dir(self, castor_dir):
1491  """Check existence of the give CASTOR dir, if necessary create
1492  it.
1493 
1494  Some special care has to be taken with several things like
1495  setting the correct permissions such that CRAB can store the
1496  output results. Of course this means that things like
1497  /castor/cern.ch/ and user/j/ have to be recognised and treated
1498  properly.
1499 
1500  NOTE: Only CERN CASTOR area (/castor/cern.ch/) supported for
1501  the moment.
1502 
1503  NOTE: This method uses some slightly tricky caching to make
1504  sure we don't keep over and over checking the same base paths.
1505 
1506  """
1507 
1508  ###
1509 
1510  # Local helper function to fully split a path into pieces.
1511  def split_completely(path):
1512  (parent_path, name) = os.path.split(path)
1513  if name == "":
1514  return (parent_path, )
1515  else:
1516  return split_completely(parent_path) + (name, )
1517 
1518  ###
1519 
1520  # Local helper function to check rfio (i.e. CASTOR)
1521  # directories.
1522  def extract_permissions(rfstat_output):
1523  """Parse the output from rfstat and return the
1524  5-digit permissions string."""
1525 
1526  permissions_line = [i for i in output.split("\n") \
1527  if i.lower().find("protection") > -1]
1528  regexp = re.compile(".*\(([0123456789]{5})\).*")
1529  match = regexp.search(rfstat_output)
1530  if not match or len(match.groups()) != 1:
1531  msg = "Could not extract permissions " \
1532  "from output: %s" % rfstat_output
1533  self.logger.fatal(msg)
1534  raise Error(msg)
1535  permissions = match.group(1)
1536 
1537  # End of extract_permissions.
1538  return permissions
1539 
1540  ###
1541 
1542  # These are the pieces of CASTOR directories that we do not
1543  # want to touch when modifying permissions.
1544 
1545  # NOTE: This is all a bit involved, basically driven by the
1546  # fact that one wants to treat the `j' directory of
1547  # `/castor/cern.ch/user/j/jhegeman/' specially.
1548  # BUG BUG BUG
1549  # This should be simplified, for example by comparing to the
1550  # CASTOR prefix or something like that.
1551  # BUG BUG BUG end
1552  castor_paths_dont_touch = {
1553  0: ["/", "castor", "cern.ch", "cms", "store", "temp",
1554  "dqm", "offline", "user"],
1555  -1: ["user", "store"]
1556  }
1557 
1558  self.logger.debug("Checking CASTOR path `%s'" % castor_dir)
1559 
1560  ###
1561 
1562  # First we take the full CASTOR path apart.
1563  castor_path_pieces = split_completely(castor_dir)
1564 
1565  # Now slowly rebuild the CASTOR path and see if a) all
1566  # permissions are set correctly and b) the final destination
1567  # exists.
1568  path = ""
1569  check_sizes = sorted(castor_paths_dont_touch.keys())
1570  len_castor_path_pieces = len(castor_path_pieces)
1571  for piece_index in xrange (len_castor_path_pieces):
1572  skip_this_path_piece = False
1573  piece = castor_path_pieces[piece_index]
1574 ## self.logger.debug("Checking CASTOR path piece `%s'" % \
1575 ## piece)
1576  for check_size in check_sizes:
1577  # Do we need to do anything with this?
1578  if (piece_index + check_size) > -1:
1579 ## self.logger.debug("Checking `%s' against `%s'" % \
1580 ## (castor_path_pieces[piece_index + check_size],
1581 ## castor_paths_dont_touch[check_size]))
1582  if castor_path_pieces[piece_index + check_size] in castor_paths_dont_touch[check_size]:
1583 ## self.logger.debug(" skipping")
1584  skip_this_path_piece = True
1585 ## else:
1586 ## # Piece not in the list, fine.
1587 ## self.logger.debug(" accepting")
1588  # Add piece to the path we're building.
1589 ## self.logger.debug("!!! Skip path piece `%s'? %s" % \
1590 ## (piece, str(skip_this_path_piece)))
1591 ## self.logger.debug("Adding piece to path...")
1592  path = os.path.join(path, piece)
1593 ## self.logger.debug("Path is now `%s'" % \
1594 ## path)
1595 
1596  # Hmmmm, only at this point can we do some caching. Not
1597  # ideal, but okay.
1598  try:
1599  if path in self.castor_path_checks_cache:
1600  continue
1601  except AttributeError:
1602  # This only happens the first time around.
1603  self.castor_path_checks_cache = []
1604  self.castor_path_checks_cache.append(path)
1605 
1606  # Now, unless we're supposed to skip this piece of the
1607  # path, let's make sure it exists and set the permissions
1608  # correctly for use by CRAB. This means that:
1609  # - the final output directory should (at least) have
1610  # permissions 775
1611  # - all directories above that should (at least) have
1612  # permissions 755.
1613 
1614  # BUT: Even though the above permissions are the usual
1615  # ones to used when setting up CASTOR areas for grid job
1616  # output, there is one caveat in case multiple people are
1617  # working in the same CASTOR area. If user X creates
1618  # /a/b/c/ and user Y wants to create /a/b/d/ he/she does
1619  # not have sufficient rights. So: we set all dir
1620  # permissions to 775 to avoid this.
1621 
1622  if not skip_this_path_piece:
1623 
1624  # Ok, first thing: let's make sure this directory
1625  # exists.
1626  # NOTE: The nice complication is of course that the
1627  # usual os.path.isdir() etc. methods don't work for an
1628  # rfio filesystem. So we call rfstat and interpret an
1629  # error as meaning that the path does not exist.
1630  self.logger.debug("Checking if path `%s' exists" % \
1631  path)
1632  cmd = "rfstat %s" % path
1633  (status, output) = commands.getstatusoutput(cmd)
1634  if status != 0:
1635  # Path does not exist, let's try and create it.
1636  self.logger.debug("Creating path `%s'" % path)
1637  cmd = "nsmkdir -m 775 %s" % path
1638  (status, output) = commands.getstatusoutput(cmd)
1639  if status != 0:
1640  msg = "Could not create directory `%s'" % path
1641  self.logger.fatal(msg)
1642  raise Error(msg)
1643  cmd = "rfstat %s" % path
1644  (status, output) = commands.getstatusoutput(cmd)
1645  # Now check that it looks like a directory. If I'm not
1646  # mistaken one can deduce this from the fact that the
1647  # (octal) permissions string starts with `40' (instead
1648  # of `100').
1649  permissions = extract_permissions(output)
1650  if not permissions.startswith("40"):
1651  msg = "Path `%s' is not a directory(?)" % path
1652  self.logger.fatal(msg)
1653  raise Error(msg)
1654 
1655  # Figure out the current permissions for this
1656  # (partial) path.
1657  self.logger.debug("Checking permissions for path `%s'" % path)
1658  cmd = "rfstat %s" % path
1659  (status, output) = commands.getstatusoutput(cmd)
1660  if status != 0:
1661  msg = "Could not obtain permissions for directory `%s'" % \
1662  path
1663  self.logger.fatal(msg)
1664  raise Error(msg)
1665  # Take the last three digits of the permissions.
1666  permissions = extract_permissions(output)[-3:]
1667 
1668  # Now if necessary fix permissions.
1669  # NOTE: Be careful never to `downgrade' permissions.
1670  if piece_index == (len_castor_path_pieces - 1):
1671  # This means we're looking at the final
1672  # destination directory.
1673  permissions_target = "775"
1674  else:
1675  # `Only' an intermediate directory.
1676  permissions_target = "775"
1677 
1678  # Compare permissions.
1679  permissions_new = []
1680  for (i, j) in zip(permissions, permissions_target):
1681  permissions_new.append(str(max(int(i), int(j))))
1682  permissions_new = "".join(permissions_new)
1683  self.logger.debug(" current permissions: %s" % \
1684  permissions)
1685  self.logger.debug(" target permissions : %s" % \
1686  permissions_target)
1687  if permissions_new != permissions:
1688  # We have to modify the permissions.
1689  self.logger.debug("Changing permissions of `%s' " \
1690  "to %s (were %s)" % \
1691  (path, permissions_new, permissions))
1692  cmd = "rfchmod %s %s" % (permissions_new, path)
1693  (status, output) = commands.getstatusoutput(cmd)
1694  if status != 0:
1695  msg = "Could not change permissions for path `%s' " \
1696  "to %s" % (path, permissions_new)
1697  self.logger.fatal(msg)
1698  raise Error(msg)
1699 
1700  self.logger.debug(" Permissions ok (%s)" % permissions_new)
1701 
1702  # End of create_and_check_castor_dir.
1703 
1704  ##########
1705 
1706  def pick_a_site(self, sites, cmssw_version):
1707 
1708  # Create list of forbidden sites
1709  sites_forbidden = []
1710 
1711  if (self.preferred_site == "CAF") or (self.preferred_site == "caf.cern.ch"):
1712  self.caf_access = True
1713 
1714  if self.caf_access == False:
1715  sites_forbidden.append("caf.cern.ch")
1716 
1717  # These are the T1 sites. These are only forbidden if we're
1718  # running in non-T1 mode.
1719  # Source:
1720  # https://cmsweb.cern.ch/sitedb/sitelist/?naming_scheme=ce
1721  # Hard-coded, yes. Not nice, no.
1722 
1723  all_t1 = [
1724  "srm-cms.cern.ch",
1725  "ccsrm.in2p3.fr",
1726  "cmssrm-fzk.gridka.de",
1727  "cmssrm.fnal.gov",
1728  "gridka-dCache.fzk.de",
1729  "srm-cms.gridpp.rl.ac.uk",
1730  "srm.grid.sinica.edu.tw",
1731  "srm2.grid.sinica.edu.tw",
1732  "srmcms.pic.es",
1733  "storm-fe-cms.cr.cnaf.infn.it"
1734  ]
1735 
1736  country_codes = {
1737  "CAF" : "caf.cern.ch",
1738  "CH" : "srm-cms.cern.ch",
1739  "FR" : "ccsrm.in2p3.fr",
1740  "DE" : "cmssrm-fzk.gridka.de",
1741  "GOV" : "cmssrm.fnal.gov",
1742  "DE2" : "gridka-dCache.fzk.de",
1743  "UK" : "srm-cms.gridpp.rl.ac.uk",
1744  "TW" : "srm.grid.sinica.edu.tw",
1745  "TW2" : "srm2.grid.sinica.edu.tw",
1746  "ES" : "srmcms.pic.es",
1747  "IT" : "storm-fe-cms.cr.cnaf.infn.it"
1748  }
1749 
1750  if self.non_t1access:
1751  sites_forbidden.extend(all_t1)
1752 
1753  for site in sites_forbidden:
1754  if site in sites:
1755  sites.remove(site)
1756 
1757  if self.preferred_site in country_codes:
1758  self.preferred_site = country_codes[self.preferred_site]
1759 
1760  if self.preferred_site != "no preference":
1761  if self.preferred_site in sites:
1762  sites = [self.preferred_site]
1763  else:
1764  sites= []
1765 
1766  #print sites
1767 
1768  # Looks like we have to do some caching here, otherwise things
1769  # become waaaay toooo sloooooow. So that's what the
1770  # sites_and_versions_cache does.
1771 
1772  # NOTE: Keep this set to None!
1773  site_name = None
1774  cmd = None
1775  while len(sites) > 0 and \
1776  site_name is None:
1777 
1778  # Create list of t1_sites
1779  t1_sites = []
1780  for site in sites:
1781  if site in all_t1:
1782  t1_sites.append(site)
1783  if site == "caf.cern.ch":
1784  t1_sites.append(site)
1785 
1786  # If avilable pick preferred site
1787  #if self.preferred_site in sites:
1788  # se_name = self.preferred_site
1789  # Else, if available pick t1 site
1790 
1791  if len(t1_sites) > 0:
1792  se_name = choice(t1_sites)
1793  # Else pick any site
1794  else:
1795  se_name = choice(sites)
1796 
1797  # But check that it hosts the CMSSW version we want.
1798 
1799  if se_name in self.sites_and_versions_cache and \
1800  cmssw_version in self.sites_and_versions_cache[se_name]:
1801  if self.sites_and_versions_cache[se_name][cmssw_version]:
1802  site_name = se_name
1803  break
1804  else:
1805  self.logger.debug(" --> rejecting site `%s'" % se_name)
1806  sites.remove(se_name)
1807 
1808  else:
1809  self.logger.info("Checking if site `%s' " \
1810  "has CMSSW version `%s'" % \
1811  (se_name, cmssw_version))
1812  self.sites_and_versions_cache[se_name] = {}
1813 
1814  # TODO TODO TODO
1815  # Test for SCRAM architecture removed as per request
1816  # from Andreas.
1817  # scram_arch = os.getenv("SCRAM_ARCH")
1818  # cmd = "lcg-info --list-ce " \
1819  # "--query '" \
1820  # "Tag=VO-cms-%s," \
1821  # "Tag=VO-cms-%s," \
1822  # "CEStatus=Production," \
1823  # "CloseSE=%s'" % \
1824  # (cmssw_version, scram_arch, se_name)
1825  # TODO TODO TODO end
1826 
1827  cmd = "lcg-info --list-ce " \
1828  "--query '" \
1829  "Tag=VO-cms-%s," \
1830  "CEStatus=Production," \
1831  "CloseSE=%s'" % \
1832  (cmssw_version, se_name)
1833  (status, output) = commands.getstatusoutput(cmd)
1834  if status != 0:
1835  self.logger.error("Could not check site information " \
1836  "for site `%s'" % se_name)
1837  else:
1838  if (len(output) > 0) or (se_name == "caf.cern.ch"):
1839  self.sites_and_versions_cache[se_name][cmssw_version] = True
1840  site_name = se_name
1841  break
1842  else:
1843  self.sites_and_versions_cache[se_name][cmssw_version] = False
1844  self.logger.debug(" --> rejecting site `%s'" % se_name)
1845  sites.remove(se_name)
1846 
1847  if site_name is self.no_matching_site_found_str:
1848  self.logger.error(" --> no matching site found")
1849  self.logger.error(" --> Your release or SCRAM " \
1850  "architecture may not be available" \
1851  "anywhere on the (LCG) grid.")
1852  if not cmd is None:
1853  self.logger.debug(" (command used: `%s')" % cmd)
1854  else:
1855  self.logger.debug(" --> selected site `%s'" % site_name)
1856 
1857  # Return something more descriptive (than `None') in case we
1858  # found nothing.
1859  if site_name is None:
1860  site_name = self.no_matching_site_found_str
1861  # Keep track of our global flag signifying that this
1862  # happened.
1863  self.all_sites_found = False
1864 
1865  # End of pick_a_site.
1866  return site_name
1867 
1868  ##########
1869 
1871 
1872  # Set up the command line parser. Note that we fix up the help
1873  # formatter so that we can add some text pointing people to
1874  # the Twiki etc.
1875  parser = optparse.OptionParser(version="%s %s" % \
1876  ("%prog", self.version),
1877  formatter=CMSHarvesterHelpFormatter())
1878 
1879  self.option_parser = parser
1880 
1881  # The debug switch.
1882  parser.add_option("-d", "--debug",
1883  help="Switch on debug mode",
1884  action="callback",
1885  callback=self.option_handler_debug)
1886 
1887  # The quiet switch.
1888  parser.add_option("-q", "--quiet",
1889  help="Be less verbose",
1890  action="callback",
1891  callback=self.option_handler_quiet)
1892 
1893  # The force switch. If this switch is used sanity checks are
1894  # performed but failures do not lead to aborts. Use with care.
1895  parser.add_option("", "--force",
1896  help="Force mode. Do not abort on sanity check "
1897  "failures",
1898  action="callback",
1899  callback=self.option_handler_force)
1900 
1901  # Choose between the different kinds of harvesting.
1902  parser.add_option("", "--harvesting_type",
1903  help="Harvesting type: %s" % \
1904  ", ".join(self.harvesting_types),
1905  action="callback",
1906  callback=self.option_handler_harvesting_type,
1907  type="string",
1908  metavar="HARVESTING_TYPE")
1909 
1910  # Choose between single-step and two-step mode.
1911  parser.add_option("", "--harvesting_mode",
1912  help="Harvesting mode: %s (default = %s)" % \
1913  (", ".join(self.harvesting_modes),
1914  self.harvesting_mode_default),
1915  action="callback",
1916  callback=self.option_handler_harvesting_mode,
1917  type="string",
1918  metavar="HARVESTING_MODE")
1919 
1920  # Override the GlobalTag chosen by the cmsHarvester.
1921  parser.add_option("", "--globaltag",
1922  help="GlobalTag to use. Default is the ones " \
1923  "the dataset was created with for MC, for data" \
1924  "a GlobalTag has to be specified.",
1925  action="callback",
1926  callback=self.option_handler_globaltag,
1927  type="string",
1928  metavar="GLOBALTAG")
1929 
1930  # Allow switching off of reference histograms.
1931  parser.add_option("", "--no-ref-hists",
1932  help="Don't use any reference histograms",
1933  action="callback",
1934  callback=self.option_handler_no_ref_hists)
1935 
1936  # Allow the default (i.e. the one that should be used)
1937  # Frontier connection to be overridden.
1938  parser.add_option("", "--frontier-connection",
1939  help="Use this Frontier connection to find " \
1940  "GlobalTags and LocalTags (for reference " \
1941  "histograms).\nPlease only use this for " \
1942  "testing.",
1943  action="callback",
1944  callback=self.option_handler_frontier_connection,
1945  type="string",
1946  metavar="FRONTIER")
1947 
1948  # Similar to the above but specific to the Frontier connection
1949  # to be used for the GlobalTag.
1950  parser.add_option("", "--frontier-connection-for-globaltag",
1951  help="Use this Frontier connection to find " \
1952  "GlobalTags.\nPlease only use this for " \
1953  "testing.",
1954  action="callback",
1955  callback=self.option_handler_frontier_connection,
1956  type="string",
1957  metavar="FRONTIER")
1958 
1959  # Similar to the above but specific to the Frontier connection
1960  # to be used for the reference histograms.
1961  parser.add_option("", "--frontier-connection-for-refhists",
1962  help="Use this Frontier connection to find " \
1963  "LocalTags (for reference " \
1964  "histograms).\nPlease only use this for " \
1965  "testing.",
1966  action="callback",
1967  callback=self.option_handler_frontier_connection,
1968  type="string",
1969  metavar="FRONTIER")
1970 
1971  # Option to specify the name (or a regexp) of the dataset(s)
1972  # to be used.
1973  parser.add_option("", "--dataset",
1974  help="Name (or regexp) of dataset(s) to process",
1975  action="callback",
1976  #callback=self.option_handler_dataset_name,
1977  callback=self.option_handler_input_spec,
1978  type="string",
1979  #dest="self.input_name",
1980  metavar="DATASET")
1981 
1982  # Option to specify the name (or a regexp) of the dataset(s)
1983  # to be ignored.
1984  parser.add_option("", "--dataset-ignore",
1985  help="Name (or regexp) of dataset(s) to ignore",
1986  action="callback",
1987  callback=self.option_handler_input_spec,
1988  type="string",
1989  metavar="DATASET-IGNORE")
1990 
1991  # Option to specify the name (or a regexp) of the run(s)
1992  # to be used.
1993  parser.add_option("", "--runs",
1994  help="Run number(s) to process",
1995  action="callback",
1996  callback=self.option_handler_input_spec,
1997  type="string",
1998  metavar="RUNS")
1999 
2000  # Option to specify the name (or a regexp) of the run(s)
2001  # to be ignored.
2002  parser.add_option("", "--runs-ignore",
2003  help="Run number(s) to ignore",
2004  action="callback",
2005  callback=self.option_handler_input_spec,
2006  type="string",
2007  metavar="RUNS-IGNORE")
2008 
2009  # Option to specify a file containing a list of dataset names
2010  # (or regexps) to be used.
2011  parser.add_option("", "--datasetfile",
2012  help="File containing list of dataset names " \
2013  "(or regexps) to process",
2014  action="callback",
2015  #callback=self.option_handler_listfile_name,
2016  callback=self.option_handler_input_spec,
2017  type="string",
2018  #dest="self.input_name",
2019  metavar="DATASETFILE")
2020 
2021  # Option to specify a file containing a list of dataset names
2022  # (or regexps) to be ignored.
2023  parser.add_option("", "--datasetfile-ignore",
2024  help="File containing list of dataset names " \
2025  "(or regexps) to ignore",
2026  action="callback",
2027  callback=self.option_handler_input_spec,
2028  type="string",
2029  metavar="DATASETFILE-IGNORE")
2030 
2031  # Option to specify a file containing a list of runs to be
2032  # used.
2033  parser.add_option("", "--runslistfile",
2034  help="File containing list of run numbers " \
2035  "to process",
2036  action="callback",
2037  callback=self.option_handler_input_spec,
2038  type="string",
2039  metavar="RUNSLISTFILE")
2040 
2041  # Option to specify a file containing a list of runs
2042  # to be ignored.
2043  parser.add_option("", "--runslistfile-ignore",
2044  help="File containing list of run numbers " \
2045  "to ignore",
2046  action="callback",
2047  callback=self.option_handler_input_spec,
2048  type="string",
2049  metavar="RUNSLISTFILE-IGNORE")
2050 
2051  # Option to specify a Jsonfile contaning a list of runs
2052  # to be used.
2053  parser.add_option("", "--Jsonrunfile",
2054  help="Jsonfile containing dictionary of run/lumisections pairs. " \
2055  "All lumisections of runs contained in dictionary are processed.",
2056  action="callback",
2057  callback=self.option_handler_input_Jsonrunfile,
2058  type="string",
2059  metavar="JSONRUNFILE")
2060 
2061  # Option to specify a Jsonfile contaning a dictionary of run/lumisections pairs
2062  # to be used.
2063  parser.add_option("", "--Jsonfile",
2064  help="Jsonfile containing dictionary of run/lumisections pairs. " \
2065  "Only specified lumisections of runs contained in dictionary are processed.",
2066  action="callback",
2067  callback=self.option_handler_input_Jsonfile,
2068  type="string",
2069  metavar="JSONFILE")
2070 
2071  # Option to specify a ToDo file contaning a list of runs
2072  # to be used.
2073  parser.add_option("", "--todo-file",
2074  help="Todo file containing a list of runs to process.",
2075  action="callback",
2076  callback=self.option_handler_input_todofile,
2077  type="string",
2078  metavar="TODO-FILE")
2079 
2080  # Option to specify which file to use for the dataset name to
2081  # reference histogram name mappings.
2082  parser.add_option("", "--refhistmappingfile",
2083  help="File to be use for the reference " \
2084  "histogram mappings. Default: `%s'." % \
2085  self.ref_hist_mappings_file_name_default,
2086  action="callback",
2087  callback=self.option_handler_ref_hist_mapping_file,
2088  type="string",
2089  metavar="REFHISTMAPPING-FILE")
2090 
2091  # Specify the place in CASTOR where the output should go.
2092  # NOTE: Only output to CASTOR is supported for the moment,
2093  # since the central DQM results place is on CASTOR anyway.
2094  parser.add_option("", "--castordir",
2095  help="Place on CASTOR to store results. " \
2096  "Default: `%s'." % \
2097  self.castor_base_dir_default,
2098  action="callback",
2099  callback=self.option_handler_castor_dir,
2100  type="string",
2101  metavar="CASTORDIR")
2102 
2103  # Use this to try and create jobs that will run without
2104  # special `t1access' role.
2105  parser.add_option("", "--no-t1access",
2106  help="Try to create jobs that will run " \
2107  "without special `t1access' role",
2108  action="callback",
2109  callback=self.option_handler_no_t1access)
2110 
2111  # Use this to create jobs that may run on CAF
2112  parser.add_option("", "--caf-access",
2113  help="Crab jobs may run " \
2114  "on CAF",
2115  action="callback",
2116  callback=self.option_handler_caf_access)
2117 
2118  # set process.dqmSaver.saveByLumiSection=1 in harvesting cfg file
2119  parser.add_option("", "--saveByLumiSection",
2120  help="set saveByLumiSection=1 in harvesting cfg file",
2121  action="callback",
2122  callback=self.option_handler_saveByLumiSection)
2123 
2124  # Use this to enable automatic creation and submission of crab jobs
2125  parser.add_option("", "--automatic-crab-submission",
2126  help="Crab jobs are created and " \
2127  "submitted automatically",
2128  action="callback",
2129  callback=self.option_handler_crab_submission)
2130 
2131  # Option to set the max number of sites, each
2132  #job is submitted to
2133  parser.add_option("", "--max-sites",
2134  help="Max. number of sites each job is submitted to",
2135  action="callback",
2136  callback=self.option_handler_sites,
2137  type="int")
2138 
2139  # Option to set the preferred site
2140  parser.add_option("", "--site",
2141  help="Crab jobs are submitted to specified site. T1 sites may be shortened by the following (country) codes: \
2142  srm-cms.cern.ch : CH \
2143  ccsrm.in2p3.fr : FR \
2144  cmssrm-fzk.gridka.de : DE \
2145  cmssrm.fnal.gov : GOV \
2146  gridka-dCache.fzk.de : DE2 \
2147  rm-cms.gridpp.rl.ac.uk : UK \
2148  srm.grid.sinica.edu.tw : TW \
2149  srm2.grid.sinica.edu.tw : TW2 \
2150  srmcms.pic.es : ES \
2151  storm-fe-cms.cr.cnaf.infn.it : IT",
2152  action="callback",
2153  callback=self.option_handler_preferred_site,
2154  type="string")
2155 
2156  # This is the command line flag to list all harvesting
2157  # type-to-sequence mappings.
2158  parser.add_option("-l", "--list",
2159  help="List all harvesting types and their" \
2160  "corresponding sequence names",
2161  action="callback",
2162  callback=self.option_handler_list_types)
2163 
2164  # If nothing was specified: tell the user how to do things the
2165  # next time and exit.
2166  # NOTE: We just use the OptParse standard way of doing this by
2167  # acting as if a '--help' was specified.
2168  if len(self.cmd_line_opts) < 1:
2169  self.cmd_line_opts = ["--help"]
2170 
2171  # Some trickery with the options. Why? Well, since these
2172  # options change the output level immediately from the option
2173  # handlers, the results differ depending on where they are on
2174  # the command line. Let's just make sure they are at the
2175  # front.
2176  # NOTE: Not very efficient or sophisticated, but it works and
2177  # it only has to be done once anyway.
2178  for i in ["-d", "--debug",
2179  "-q", "--quiet"]:
2180  if i in self.cmd_line_opts:
2181  self.cmd_line_opts.remove(i)
2182  self.cmd_line_opts.insert(0, i)
2183 
2184  # Everything is set up, now parse what we were given.
2185  parser.set_defaults()
2186  (self.options, self.args) = parser.parse_args(self.cmd_line_opts)
2187 
2188  # End of parse_cmd_line_options.
2189 
2190  ##########
2191 
2193  """Check completeness and correctness of input information.
2194 
2195  Check that all required information has been specified and
2196  that, at least as far as can be easily checked, it makes
2197  sense.
2198 
2199  NOTE: This is also where any default values are applied.
2200 
2201  """
2202 
2203  self.logger.info("Checking completeness/correctness of input...")
2204 
2205  # The cmsHarvester does not take (i.e. understand) any
2206  # arguments so there should not be any.
2207  if len(self.args) > 0:
2208  msg = "Sorry but I don't understand `%s'" % \
2209  (" ".join(self.args))
2210  self.logger.fatal(msg)
2211  raise Usage(msg)
2212 
2213  # BUG BUG BUG
2214  # While we wait for some bugs left and right to get fixed, we
2215  # disable two-step.
2216  if self.harvesting_mode == "two-step":
2217  msg = "--------------------\n" \
2218  " Sorry, but for the moment (well, till it works)" \
2219  " the two-step mode has been disabled.\n" \
2220  "--------------------\n"
2221  self.logger.fatal(msg)
2222  raise Error(msg)
2223  # BUG BUG BUG end
2224 
2225  # We need a harvesting method to be specified
2226  if self.harvesting_type is None:
2227  msg = "Please specify a harvesting type"
2228  self.logger.fatal(msg)
2229  raise Usage(msg)
2230  # as well as a harvesting mode.
2231  if self.harvesting_mode is None:
2232  self.harvesting_mode = self.harvesting_mode_default
2233  msg = "No harvesting mode specified --> using default `%s'" % \
2234  self.harvesting_mode
2235  self.logger.warning(msg)
2236  #raise Usage(msg)
2237 
2238  ###
2239 
2240  # We need an input method so we can find the dataset name(s).
2241  if self.input_method["datasets"]["use"] is None:
2242  msg = "Please specify an input dataset name " \
2243  "or a list file name"
2244  self.logger.fatal(msg)
2245  raise Usage(msg)
2246 
2247  # DEBUG DEBUG DEBUG
2248  # If we get here, we should also have an input name.
2249  assert not self.input_name["datasets"]["use"] is None
2250  # DEBUG DEBUG DEBUG end
2251 
2252  ###
2253 
2254  # The same holds for the reference histogram mapping file (if
2255  # we're using references).
2256  if self.use_ref_hists:
2257  if self.ref_hist_mappings_file_name is None:
2258  self.ref_hist_mappings_file_name = self.ref_hist_mappings_file_name_default
2259  msg = "No reference histogram mapping file specified --> " \
2260  "using default `%s'" % \
2261  self.ref_hist_mappings_file_name
2262  self.logger.warning(msg)
2263 
2264  ###
2265 
2266  # We need to know where to put the stuff (okay, the results)
2267  # on CASTOR.
2268  if self.castor_base_dir is None:
2269  self.castor_base_dir = self.castor_base_dir_default
2270  msg = "No CASTOR area specified -> using default `%s'" % \
2271  self.castor_base_dir
2272  self.logger.warning(msg)
2273  #raise Usage(msg)
2274 
2275  # Only the CERN CASTOR area is supported.
2276  if not self.castor_base_dir.startswith(self.castor_prefix):
2277  msg = "CASTOR area does not start with `%s'" % \
2278  self.castor_prefix
2279  self.logger.fatal(msg)
2280  if self.castor_base_dir.find("castor") > -1 and \
2281  not self.castor_base_dir.find("cern.ch") > -1:
2282  self.logger.fatal("Only CERN CASTOR is supported")
2283  raise Usage(msg)
2284 
2285  ###
2286 
2287  # TODO TODO TODO
2288  # This should be removed in the future, once I find out how to
2289  # get the config file used to create a given dataset from DBS.
2290 
2291  # For data we need to have a GlobalTag. (For MC we can figure
2292  # it out by ourselves.)
2293  if self.globaltag is None:
2294  self.logger.warning("No GlobalTag specified. This means I cannot")
2295  self.logger.warning("run on data, only on MC.")
2296  self.logger.warning("I will skip all data datasets.")
2297 
2298  # TODO TODO TODO end
2299 
2300  # Make sure the GlobalTag ends with `::All'.
2301  if not self.globaltag is None:
2302  if not self.globaltag.endswith("::All"):
2303  self.logger.warning("Specified GlobalTag `%s' does " \
2304  "not end in `::All' --> " \
2305  "appending this missing piece" % \
2306  self.globaltag)
2307  self.globaltag = "%s::All" % self.globaltag
2308 
2309  ###
2310 
2311  # Dump some info about the Frontier connections used.
2312  for (key, value) in six.iteritems(self.frontier_connection_name):
2313  frontier_type_str = "unknown"
2314  if key == "globaltag":
2315  frontier_type_str = "the GlobalTag"
2316  elif key == "refhists":
2317  frontier_type_str = "the reference histograms"
2318  non_str = None
2319  if self.frontier_connection_overridden[key] == True:
2320  non_str = "non-"
2321  else:
2322  non_str = ""
2323  self.logger.info("Using %sdefault Frontier " \
2324  "connection for %s: `%s'" % \
2325  (non_str, frontier_type_str, value))
2326 
2327  ###
2328 
2329  # End of check_input_status.
2330 
2331  ##########
2332 
2333  def check_cmssw(self):
2334  """Check if CMSSW is setup.
2335 
2336  """
2337 
2338  # Try to access the CMSSW_VERSION environment variable. If
2339  # it's something useful we consider CMSSW to be set up
2340  # properly. Otherwise we raise an error.
2341  cmssw_version = os.getenv("CMSSW_VERSION")
2342  if cmssw_version is None:
2343  self.logger.fatal("It seems CMSSW is not setup...")
2344  self.logger.fatal("($CMSSW_VERSION is empty)")
2345  raise Error("ERROR: CMSSW needs to be setup first!")
2346 
2347  self.cmssw_version = cmssw_version
2348  self.logger.info("Found CMSSW version %s properly set up" % \
2349  self.cmssw_version)
2350 
2351  # End of check_cmsssw.
2352  return True
2353 
2354  ##########
2355 
2356  def check_dbs(self):
2357  """Check if DBS is setup.
2358 
2359  """
2360 
2361  # Try to access the DBSCMD_HOME environment variable. If this
2362  # looks useful we consider DBS to be set up
2363  # properly. Otherwise we raise an error.
2364  dbs_home = os.getenv("DBSCMD_HOME")
2365  if dbs_home is None:
2366  self.logger.fatal("It seems DBS is not setup...")
2367  self.logger.fatal(" $DBSCMD_HOME is empty")
2368  raise Error("ERROR: DBS needs to be setup first!")
2369 
2370 ## # Now we try to do a very simple DBS search. If that works
2371 ## # instead of giving us the `Unsupported API call' crap, we
2372 ## # should be good to go.
2373 ## # NOTE: Not ideal, I know, but it reduces the amount of
2374 ## # complaints I get...
2375 ## cmd = "dbs search --query=\"find dataset where dataset = impossible\""
2376 ## (status, output) = commands.getstatusoutput(cmd)
2377 ## pdb.set_trace()
2378 ## if status != 0 or \
2379 ## output.lower().find("unsupported api call") > -1:
2380 ## self.logger.fatal("It seems DBS is not setup...")
2381 ## self.logger.fatal(" %s returns crap:" % cmd)
2382 ## for line in output.split("\n"):
2383 ## self.logger.fatal(" %s" % line)
2384 ## raise Error("ERROR: DBS needs to be setup first!")
2385 
2386  self.logger.debug("Found DBS properly set up")
2387 
2388  # End of check_dbs.
2389  return True
2390 
2391  ##########
2392 
2393  def setup_dbs(self):
2394  """Setup the Python side of DBS.
2395 
2396  For more information see the DBS Python API documentation:
2397  https://twiki.cern.ch/twiki/bin/view/CMS/DBSApiDocumentation
2398 
2399  """
2400 
2401  try:
2402  args={}
2403  args["url"]= "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/" \
2404  "servlet/DBSServlet"
2405  api = DbsApi(args)
2406  self.dbs_api = api
2407 
2408  except DBSAPI.dbsApiException.DbsApiException as ex:
2409  self.logger.fatal("Caught DBS API exception %s: %s " % \
2410  (ex.getClassName(), ex.getErrorMessage()))
2411  if ex.getErrorCode() not in (None, ""):
2412  logger.debug("DBS exception error code: ", ex.getErrorCode())
2413  raise
2414 
2415  # End of setup_dbs.
2416 
2417  ##########
2418 
2419  def dbs_resolve_dataset_name(self, dataset_name):
2420  """Use DBS to resolve a wildcarded dataset name.
2421 
2422  """
2423 
2424  # DEBUG DEBUG DEBUG
2425  # If we get here DBS should have been set up already.
2426  assert not self.dbs_api is None
2427  # DEBUG DEBUG DEBUG end
2428 
2429  # Some minor checking to make sure that whatever we've been
2430  # given as dataset name actually sounds like a dataset name.
2431  if not (dataset_name.startswith("/") and \
2432  dataset_name.endswith("RECO")):
2433  self.logger.warning("Dataset name `%s' does not sound " \
2434  "like a valid dataset name!" % \
2435  dataset_name)
2436 
2437  #----------
2438 
2439  api = self.dbs_api
2440  dbs_query = "find dataset where dataset like %s " \
2441  "and dataset.status = VALID" % \
2442  dataset_name
2443  try:
2444  api_result = api.executeQuery(dbs_query)
2445  except DBSAPI.dbsApiException.DbsApiException:
2446  msg = "ERROR: Could not execute DBS query"
2447  self.logger.fatal(msg)
2448  raise Error(msg)
2449 
2450  # Setup parsing.
2451  handler = DBSXMLHandler(["dataset"])
2452  parser = xml.sax.make_parser()
2453  parser.setContentHandler(handler)
2454 
2455  # Parse.
2456  try:
2457  xml.sax.parseString(api_result, handler)
2458  except SAXParseException:
2459  msg = "ERROR: Could not parse DBS server output"
2460  self.logger.fatal(msg)
2461  raise Error(msg)
2462 
2463  # DEBUG DEBUG DEBUG
2464  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2465  # DEBUG DEBUG DEBUG end
2466 
2467  # Extract the results.
2468  datasets = handler.results.values()[0]
2469 
2470  # End of dbs_resolve_dataset_name.
2471  return datasets
2472 
2473  ##########
2474 
2475  def dbs_resolve_cmssw_version(self, dataset_name):
2476  """Ask DBS for the CMSSW version used to create this dataset.
2477 
2478  """
2479 
2480  # DEBUG DEBUG DEBUG
2481  # If we get here DBS should have been set up already.
2482  assert not self.dbs_api is None
2483  # DEBUG DEBUG DEBUG end
2484 
2485  api = self.dbs_api
2486  dbs_query = "find algo.version where dataset = %s " \
2487  "and dataset.status = VALID" % \
2488  dataset_name
2489  try:
2490  api_result = api.executeQuery(dbs_query)
2491  except DBSAPI.dbsApiException.DbsApiException:
2492  msg = "ERROR: Could not execute DBS query"
2493  self.logger.fatal(msg)
2494  raise Error(msg)
2495 
2496  handler = DBSXMLHandler(["algo.version"])
2497  parser = xml.sax.make_parser()
2498  parser.setContentHandler(handler)
2499 
2500  try:
2501  xml.sax.parseString(api_result, handler)
2502  except SAXParseException:
2503  msg = "ERROR: Could not parse DBS server output"
2504  self.logger.fatal(msg)
2505  raise Error(msg)
2506 
2507  # DEBUG DEBUG DEBUG
2508  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2509  # DEBUG DEBUG DEBUG end
2510 
2511  cmssw_version = handler.results.values()[0]
2512 
2513  # DEBUG DEBUG DEBUG
2514  assert len(cmssw_version) == 1
2515  # DEBUG DEBUG DEBUG end
2516 
2517  cmssw_version = cmssw_version[0]
2518 
2519  # End of dbs_resolve_cmssw_version.
2520  return cmssw_version
2521 
2522  ##########
2523 
2524 ## def dbs_resolve_dataset_number_of_events(self, dataset_name):
2525 ## """Ask DBS across how many events this dataset has been spread
2526 ## out.
2527 
2528 ## This is especially useful to check that we do not submit a job
2529 ## supposed to run on a complete sample that is not contained at
2530 ## a single site.
2531 
2532 ## """
2533 
2534 ## # DEBUG DEBUG DEBUG
2535 ## # If we get here DBS should have been set up already.
2536 ## assert not self.dbs_api is None
2537 ## # DEBUG DEBUG DEBUG end
2538 
2539 ## api = self.dbs_api
2540 ## dbs_query = "find count(site) where dataset = %s " \
2541 ## "and dataset.status = VALID" % \
2542 ## dataset_name
2543 ## try:
2544 ## api_result = api.executeQuery(dbs_query)
2545 ## except DbsApiException:
2546 ## raise Error("ERROR: Could not execute DBS query")
2547 
2548 ## try:
2549 ## num_events = []
2550 ## class Handler(xml.sax.handler.ContentHandler):
2551 ## def startElement(self, name, attrs):
2552 ## if name == "result":
2553 ## num_events.append(str(attrs["COUNT_STORAGEELEMENT"]))
2554 ## xml.sax.parseString(api_result, Handler())
2555 ## except SAXParseException:
2556 ## raise Error("ERROR: Could not parse DBS server output")
2557 
2558 ## # DEBUG DEBUG DEBUG
2559 ## assert len(num_events) == 1
2560 ## # DEBUG DEBUG DEBUG end
2561 
2562 ## num_events = int(num_events[0])
2563 
2564 ## # End of dbs_resolve_dataset_number_of_events.
2565 ## return num_events
2566 
2567  ##########
2568 
2569  def dbs_resolve_runs(self, dataset_name):
2570  """Ask DBS for the list of runs in a given dataset.
2571 
2572  # NOTE: This does not (yet?) skip/remove empty runs. There is
2573  # a bug in the DBS entry run.numevents (i.e. it always returns
2574  # zero) which should be fixed in the `next DBS release'.
2575  # See also:
2576  # https://savannah.cern.ch/bugs/?53452
2577  # https://savannah.cern.ch/bugs/?53711
2578 
2579  """
2580 
2581  # TODO TODO TODO
2582  # We should remove empty runs as soon as the above mentioned
2583  # bug is fixed.
2584  # TODO TODO TODO end
2585 
2586  # DEBUG DEBUG DEBUG
2587  # If we get here DBS should have been set up already.
2588  assert not self.dbs_api is None
2589  # DEBUG DEBUG DEBUG end
2590 
2591  api = self.dbs_api
2592  dbs_query = "find run where dataset = %s " \
2593  "and dataset.status = VALID" % \
2594  dataset_name
2595  try:
2596  api_result = api.executeQuery(dbs_query)
2597  except DBSAPI.dbsApiException.DbsApiException:
2598  msg = "ERROR: Could not execute DBS query"
2599  self.logger.fatal(msg)
2600  raise Error(msg)
2601 
2602  handler = DBSXMLHandler(["run"])
2603  parser = xml.sax.make_parser()
2604  parser.setContentHandler(handler)
2605 
2606  try:
2607  xml.sax.parseString(api_result, handler)
2608  except SAXParseException:
2609  msg = "ERROR: Could not parse DBS server output"
2610  self.logger.fatal(msg)
2611  raise Error(msg)
2612 
2613  # DEBUG DEBUG DEBUG
2614  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2615  # DEBUG DEBUG DEBUG end
2616 
2617  runs = handler.results.values()[0]
2618  # Turn strings into integers.
2619  runs = sorted([int(i) for i in runs])
2620 
2621  # End of dbs_resolve_runs.
2622  return runs
2623 
2624  ##########
2625 
2626  def dbs_resolve_globaltag(self, dataset_name):
2627  """Ask DBS for the globaltag corresponding to a given dataset.
2628 
2629  # BUG BUG BUG
2630  # This does not seem to work for data datasets? E.g. for
2631  # /Cosmics/Commissioning08_CRAFT0831X_V1_311_ReReco_FromSuperPointing_v1/RAW-RECO
2632  # Probaly due to the fact that the GlobalTag changed during
2633  # datataking...
2634  BUG BUG BUG end
2635 
2636  """
2637 
2638  # DEBUG DEBUG DEBUG
2639  # If we get here DBS should have been set up already.
2640  assert not self.dbs_api is None
2641  # DEBUG DEBUG DEBUG end
2642 
2643  api = self.dbs_api
2644  dbs_query = "find dataset.tag where dataset = %s " \
2645  "and dataset.status = VALID" % \
2646  dataset_name
2647  try:
2648  api_result = api.executeQuery(dbs_query)
2649  except DBSAPI.dbsApiException.DbsApiException:
2650  msg = "ERROR: Could not execute DBS query"
2651  self.logger.fatal(msg)
2652  raise Error(msg)
2653 
2654  handler = DBSXMLHandler(["dataset.tag"])
2655  parser = xml.sax.make_parser()
2656  parser.setContentHandler(parser)
2657 
2658  try:
2659  xml.sax.parseString(api_result, handler)
2660  except SAXParseException:
2661  msg = "ERROR: Could not parse DBS server output"
2662  self.logger.fatal(msg)
2663  raise Error(msg)
2664 
2665  # DEBUG DEBUG DEBUG
2666  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2667  # DEBUG DEBUG DEBUG end
2668 
2669  globaltag = handler.results.values()[0]
2670 
2671  # DEBUG DEBUG DEBUG
2672  assert len(globaltag) == 1
2673  # DEBUG DEBUG DEBUG end
2674 
2675  globaltag = globaltag[0]
2676 
2677  # End of dbs_resolve_globaltag.
2678  return globaltag
2679 
2680  ##########
2681 
2682  def dbs_resolve_datatype(self, dataset_name):
2683  """Ask DBS for the the data type (data or mc) of a given
2684  dataset.
2685 
2686  """
2687 
2688  # DEBUG DEBUG DEBUG
2689  # If we get here DBS should have been set up already.
2690  assert not self.dbs_api is None
2691  # DEBUG DEBUG DEBUG end
2692 
2693  api = self.dbs_api
2694  dbs_query = "find datatype.type where dataset = %s " \
2695  "and dataset.status = VALID" % \
2696  dataset_name
2697  try:
2698  api_result = api.executeQuery(dbs_query)
2699  except DBSAPI.dbsApiException.DbsApiException:
2700  msg = "ERROR: Could not execute DBS query"
2701  self.logger.fatal(msg)
2702  raise Error(msg)
2703 
2704  handler = DBSXMLHandler(["datatype.type"])
2705  parser = xml.sax.make_parser()
2706  parser.setContentHandler(handler)
2707 
2708  try:
2709  xml.sax.parseString(api_result, handler)
2710  except SAXParseException:
2711  msg = "ERROR: Could not parse DBS server output"
2712  self.logger.fatal(msg)
2713  raise Error(msg)
2714 
2715  # DEBUG DEBUG DEBUG
2716  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2717  # DEBUG DEBUG DEBUG end
2718 
2719  datatype = handler.results.values()[0]
2720 
2721  # DEBUG DEBUG DEBUG
2722  assert len(datatype) == 1
2723  # DEBUG DEBUG DEBUG end
2724 
2725  datatype = datatype[0]
2726 
2727  # End of dbs_resolve_datatype.
2728  return datatype
2729 
2730  ##########
2731 
2732  # OBSOLETE OBSOLETE OBSOLETE
2733  # This method is no longer used.
2734 
2735  def dbs_resolve_number_of_events(self, dataset_name, run_number=None):
2736  """Determine the number of events in a given dataset (and run).
2737 
2738  Ask DBS for the number of events in a dataset. If a run number
2739  is specified the number of events returned is that in that run
2740  of that dataset. If problems occur we throw an exception.
2741 
2742  # BUG BUG BUG
2743  # Since DBS does not return the number of events correctly,
2744  # neither for runs nor for whole datasets, we have to work
2745  # around that a bit...
2746  # BUG BUG BUG end
2747 
2748  """
2749 
2750  # DEBUG DEBUG DEBUG
2751  # If we get here DBS should have been set up already.
2752  assert not self.dbs_api is None
2753  # DEBUG DEBUG DEBUG end
2754 
2755  api = self.dbs_api
2756  dbs_query = "find file.name, file.numevents where dataset = %s " \
2757  "and dataset.status = VALID" % \
2758  dataset_name
2759  if not run_number is None:
2760  dbs_query = dbq_query + (" and run = %d" % run_number)
2761  try:
2762  api_result = api.executeQuery(dbs_query)
2763  except DBSAPI.dbsApiException.DbsApiException:
2764  msg = "ERROR: Could not execute DBS query"
2765  self.logger.fatal(msg)
2766  raise Error(msg)
2767 
2768  handler = DBSXMLHandler(["file.name", "file.numevents"])
2769  parser = xml.sax.make_parser()
2770  parser.setContentHandler(handler)
2771 
2772  try:
2773  xml.sax.parseString(api_result, handler)
2774  except SAXParseException:
2775  msg = "ERROR: Could not parse DBS server output"
2776  self.logger.fatal(msg)
2777  raise Error(msg)
2778 
2779  # DEBUG DEBUG DEBUG
2780  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2781  # DEBUG DEBUG DEBUG end
2782 
2783  num_events = sum(handler.results["file.numevents"])
2784 
2785  # End of dbs_resolve_number_of_events.
2786  return num_events
2787 
2788  # OBSOLETE OBSOLETE OBSOLETE end
2789 
2790  ##########
2791 
2792 ## def dbs_resolve_dataset_number_of_sites(self, dataset_name):
2793 ## """Ask DBS across how many sites this dataset has been spread
2794 ## out.
2795 
2796 ## This is especially useful to check that we do not submit a job
2797 ## supposed to run on a complete sample that is not contained at
2798 ## a single site.
2799 
2800 ## """
2801 
2802 ## # DEBUG DEBUG DEBUG
2803 ## # If we get here DBS should have been set up already.
2804 ## assert not self.dbs_api is None
2805 ## # DEBUG DEBUG DEBUG end
2806 
2807 ## api = self.dbs_api
2808 ## dbs_query = "find count(site) where dataset = %s " \
2809 ## "and dataset.status = VALID" % \
2810 ## dataset_name
2811 ## try:
2812 ## api_result = api.executeQuery(dbs_query)
2813 ## except DbsApiException:
2814 ## raise Error("ERROR: Could not execute DBS query")
2815 
2816 ## try:
2817 ## num_sites = []
2818 ## class Handler(xml.sax.handler.ContentHandler):
2819 ## def startElement(self, name, attrs):
2820 ## if name == "result":
2821 ## num_sites.append(str(attrs["COUNT_STORAGEELEMENT"]))
2822 ## xml.sax.parseString(api_result, Handler())
2823 ## except SAXParseException:
2824 ## raise Error("ERROR: Could not parse DBS server output")
2825 
2826 ## # DEBUG DEBUG DEBUG
2827 ## assert len(num_sites) == 1
2828 ## # DEBUG DEBUG DEBUG end
2829 
2830 ## num_sites = int(num_sites[0])
2831 
2832 ## # End of dbs_resolve_dataset_number_of_sites.
2833 ## return num_sites
2834 
2835  ##########
2836 
2837 ## def dbs_check_dataset_spread(self, dataset_name):
2838 ## """Figure out across how many sites this dataset is spread.
2839 
2840 ## NOTE: This is something we need to figure out per run, since
2841 ## we want to submit harvesting jobs per run.
2842 
2843 ## Basically three things can happen with a given dataset:
2844 ## - the whole dataset is available on a single site,
2845 ## - the whole dataset is available (mirrored) at multiple sites,
2846 ## - the dataset is spread across multiple sites and there is no
2847 ## single site containing the full dataset in one place.
2848 
2849 ## NOTE: If all goes well, it should not be possible that
2850 ## anything but a _full_ dataset is mirrored. So we ignore the
2851 ## possibility in which for example one site contains the full
2852 ## dataset and two others mirror half of it.
2853 ## ANOTHER NOTE: According to some people this last case _could_
2854 ## actually happen. I will not design for it, but make sure it
2855 ## ends up as a false negative, in which case we just loose some
2856 ## efficiency and treat the dataset (unnecessarily) as
2857 ## spread-out.
2858 
2859 ## We don't really care about the first two possibilities, but in
2860 ## the third case we need to make sure to run the harvesting in
2861 ## two-step mode.
2862 
2863 ## This method checks with DBS which of the above cases is true
2864 ## for the dataset name given, and returns a 1 for the first two
2865 ## cases, and the number of sites across which the dataset is
2866 ## spread for the third case.
2867 
2868 ## The way in which this is done is by asking how many files each
2869 ## site has for the dataset. In the first case there is only one
2870 ## site, in the second case all sites should have the same number
2871 ## of files (i.e. the total number of files in the dataset) and
2872 ## in the third case the file counts from all sites should add up
2873 ## to the total file count for the dataset.
2874 
2875 ## """
2876 
2877 ## # DEBUG DEBUG DEBUG
2878 ## # If we get here DBS should have been set up already.
2879 ## assert not self.dbs_api is None
2880 ## # DEBUG DEBUG DEBUG end
2881 
2882 ## api = self.dbs_api
2883 ## dbs_query = "find run, run.numevents, site, file.count " \
2884 ## "where dataset = %s " \
2885 ## "and dataset.status = VALID" % \
2886 ## dataset_name
2887 ## try:
2888 ## api_result = api.executeQuery(dbs_query)
2889 ## except DbsApiException:
2890 ## msg = "ERROR: Could not execute DBS query"
2891 ## self.logger.fatal(msg)
2892 ## raise Error(msg)
2893 
2894 ## # Index things by run number. No cross-check is done to make
2895 ## # sure we get results for each and every run in the
2896 ## # dataset. I'm not sure this would make sense since we'd be
2897 ## # cross-checking DBS info with DBS info anyway. Note that we
2898 ## # use the file count per site to see if we're dealing with an
2899 ## # incomplete vs. a mirrored dataset.
2900 ## sample_info = {}
2901 ## try:
2902 ## class Handler(xml.sax.handler.ContentHandler):
2903 ## def startElement(self, name, attrs):
2904 ## if name == "result":
2905 ## run_number = int(attrs["RUNS_RUNNUMBER"])
2906 ## site_name = str(attrs["STORAGEELEMENT_SENAME"])
2907 ## file_count = int(attrs["COUNT_FILES"])
2908 ## # BUG BUG BUG
2909 ## # Doh! For some reason DBS never returns any other
2910 ## # event count than zero.
2911 ## event_count = int(attrs["RUNS_NUMBEROFEVENTS"])
2912 ## # BUG BUG BUG end
2913 ## info = (site_name, file_count, event_count)
2914 ## try:
2915 ## sample_info[run_number].append(info)
2916 ## except KeyError:
2917 ## sample_info[run_number] = [info]
2918 ## xml.sax.parseString(api_result, Handler())
2919 ## except SAXParseException:
2920 ## msg = "ERROR: Could not parse DBS server output"
2921 ## self.logger.fatal(msg)
2922 ## raise Error(msg)
2923 
2924 ## # Now translate this into a slightly more usable mapping.
2925 ## sites = {}
2926 ## for (run_number, site_info) in six.iteritems(sample_info):
2927 ## # Quick-n-dirty trick to see if all file counts are the
2928 ## # same.
2929 ## unique_file_counts = set([i[1] for i in site_info])
2930 ## if len(unique_file_counts) == 1:
2931 ## # Okay, so this must be a mirrored dataset.
2932 ## # We have to pick one but we have to be careful. We
2933 ## # cannot submit to things like a T0, a T1, or CAF.
2934 ## site_names = [self.pick_a_site([i[0] for i in site_info])]
2935 ## nevents = [site_info[0][2]]
2936 ## else:
2937 ## # Looks like this is a spread-out sample.
2938 ## site_names = [i[0] for i in site_info]
2939 ## nevents = [i[2] for i in site_info]
2940 ## sites[run_number] = zip(site_names, nevents)
2941 
2942 ## self.logger.debug("Sample `%s' spread is:" % dataset_name)
2943 ## run_numbers = sites.keys()
2944 ## run_numbers.sort()
2945 ## for run_number in run_numbers:
2946 ## self.logger.debug(" run # %6d: %d sites (%s)" % \
2947 ## (run_number,
2948 ## len(sites[run_number]),
2949 ## ", ".join([i[0] for i in sites[run_number]])))
2950 
2951 ## # End of dbs_check_dataset_spread.
2952 ## return sites
2953 
2954 ## # DEBUG DEBUG DEBUG
2955 ## # Just kept for debugging now.
2956 ## def dbs_check_dataset_spread_old(self, dataset_name):
2957 ## """Figure out across how many sites this dataset is spread.
2958 
2959 ## NOTE: This is something we need to figure out per run, since
2960 ## we want to submit harvesting jobs per run.
2961 
2962 ## Basically three things can happen with a given dataset:
2963 ## - the whole dataset is available on a single site,
2964 ## - the whole dataset is available (mirrored) at multiple sites,
2965 ## - the dataset is spread across multiple sites and there is no
2966 ## single site containing the full dataset in one place.
2967 
2968 ## NOTE: If all goes well, it should not be possible that
2969 ## anything but a _full_ dataset is mirrored. So we ignore the
2970 ## possibility in which for example one site contains the full
2971 ## dataset and two others mirror half of it.
2972 ## ANOTHER NOTE: According to some people this last case _could_
2973 ## actually happen. I will not design for it, but make sure it
2974 ## ends up as a false negative, in which case we just loose some
2975 ## efficiency and treat the dataset (unnecessarily) as
2976 ## spread-out.
2977 
2978 ## We don't really care about the first two possibilities, but in
2979 ## the third case we need to make sure to run the harvesting in
2980 ## two-step mode.
2981 
2982 ## This method checks with DBS which of the above cases is true
2983 ## for the dataset name given, and returns a 1 for the first two
2984 ## cases, and the number of sites across which the dataset is
2985 ## spread for the third case.
2986 
2987 ## The way in which this is done is by asking how many files each
2988 ## site has for the dataset. In the first case there is only one
2989 ## site, in the second case all sites should have the same number
2990 ## of files (i.e. the total number of files in the dataset) and
2991 ## in the third case the file counts from all sites should add up
2992 ## to the total file count for the dataset.
2993 
2994 ## """
2995 
2996 ## # DEBUG DEBUG DEBUG
2997 ## # If we get here DBS should have been set up already.
2998 ## assert not self.dbs_api is None
2999 ## # DEBUG DEBUG DEBUG end
3000 
3001 ## api = self.dbs_api
3002 ## dbs_query = "find run, run.numevents, site, file.count " \
3003 ## "where dataset = %s " \
3004 ## "and dataset.status = VALID" % \
3005 ## dataset_name
3006 ## try:
3007 ## api_result = api.executeQuery(dbs_query)
3008 ## except DbsApiException:
3009 ## msg = "ERROR: Could not execute DBS query"
3010 ## self.logger.fatal(msg)
3011 ## raise Error(msg)
3012 
3013 ## # Index things by run number. No cross-check is done to make
3014 ## # sure we get results for each and every run in the
3015 ## # dataset. I'm not sure this would make sense since we'd be
3016 ## # cross-checking DBS info with DBS info anyway. Note that we
3017 ## # use the file count per site to see if we're dealing with an
3018 ## # incomplete vs. a mirrored dataset.
3019 ## sample_info = {}
3020 ## try:
3021 ## class Handler(xml.sax.handler.ContentHandler):
3022 ## def startElement(self, name, attrs):
3023 ## if name == "result":
3024 ## run_number = int(attrs["RUNS_RUNNUMBER"])
3025 ## site_name = str(attrs["STORAGEELEMENT_SENAME"])
3026 ## file_count = int(attrs["COUNT_FILES"])
3027 ## # BUG BUG BUG
3028 ## # Doh! For some reason DBS never returns any other
3029 ## # event count than zero.
3030 ## event_count = int(attrs["RUNS_NUMBEROFEVENTS"])
3031 ## # BUG BUG BUG end
3032 ## info = (site_name, file_count, event_count)
3033 ## try:
3034 ## sample_info[run_number].append(info)
3035 ## except KeyError:
3036 ## sample_info[run_number] = [info]
3037 ## xml.sax.parseString(api_result, Handler())
3038 ## except SAXParseException:
3039 ## msg = "ERROR: Could not parse DBS server output"
3040 ## self.logger.fatal(msg)
3041 ## raise Error(msg)
3042 
3043 ## # Now translate this into a slightly more usable mapping.
3044 ## sites = {}
3045 ## for (run_number, site_info) in six.iteritems(sample_info):
3046 ## # Quick-n-dirty trick to see if all file counts are the
3047 ## # same.
3048 ## unique_file_counts = set([i[1] for i in site_info])
3049 ## if len(unique_file_counts) == 1:
3050 ## # Okay, so this must be a mirrored dataset.
3051 ## # We have to pick one but we have to be careful. We
3052 ## # cannot submit to things like a T0, a T1, or CAF.
3053 ## site_names = [self.pick_a_site([i[0] for i in site_info])]
3054 ## nevents = [site_info[0][2]]
3055 ## else:
3056 ## # Looks like this is a spread-out sample.
3057 ## site_names = [i[0] for i in site_info]
3058 ## nevents = [i[2] for i in site_info]
3059 ## sites[run_number] = zip(site_names, nevents)
3060 
3061 ## self.logger.debug("Sample `%s' spread is:" % dataset_name)
3062 ## run_numbers = sites.keys()
3063 ## run_numbers.sort()
3064 ## for run_number in run_numbers:
3065 ## self.logger.debug(" run # %6d: %d site(s) (%s)" % \
3066 ## (run_number,
3067 ## len(sites[run_number]),
3068 ## ", ".join([i[0] for i in sites[run_number]])))
3069 
3070 ## # End of dbs_check_dataset_spread_old.
3071 ## return sites
3072 ## # DEBUG DEBUG DEBUG end
3073 
3074  ##########
3075 
3076  def dbs_check_dataset_spread(self, dataset_name):
3077  """Figure out the number of events in each run of this dataset.
3078 
3079  This is a more efficient way of doing this than calling
3080  dbs_resolve_number_of_events for each run.
3081 
3082  """
3083 
3084  self.logger.debug("Checking spread of dataset `%s'" % dataset_name)
3085 
3086  # DEBUG DEBUG DEBUG
3087  # If we get here DBS should have been set up already.
3088  assert not self.dbs_api is None
3089  # DEBUG DEBUG DEBUG end
3090 
3091  api = self.dbs_api
3092  dbs_query = "find run.number, site, file.name, file.numevents " \
3093  "where dataset = %s " \
3094  "and dataset.status = VALID" % \
3095  dataset_name
3096  try:
3097  api_result = api.executeQuery(dbs_query)
3098  except DBSAPI.dbsApiException.DbsApiException:
3099  msg = "ERROR: Could not execute DBS query"
3100  self.logger.fatal(msg)
3101  raise Error(msg)
3102 
3103  handler = DBSXMLHandler(["run.number", "site", "file.name", "file.numevents"])
3104  parser = xml.sax.make_parser()
3105  parser.setContentHandler(handler)
3106 
3107  try:
3108  # OBSOLETE OBSOLETE OBSOLETE
3109 ## class Handler(xml.sax.handler.ContentHandler):
3110 ## def startElement(self, name, attrs):
3111 ## if name == "result":
3112 ## site_name = str(attrs["STORAGEELEMENT_SENAME"])
3113 ## # TODO TODO TODO
3114 ## # Ugly hack to get around cases like this:
3115 ## # $ dbs search --query="find dataset, site, file.count where dataset=/RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO"
3116 ## # Using DBS instance at: http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet
3117 ## # Processing ... \
3118 ## # PATH STORAGEELEMENT_SENAME COUNT_FILES
3119 ## # _________________________________________________________________________________
3120 ## # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO 1
3121 ## # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO cmssrm.fnal.gov 12
3122 ## # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO srm-cms.cern.ch 12
3123 ## if len(site_name) < 1:
3124 ## return
3125 ## # TODO TODO TODO end
3126 ## run_number = int(attrs["RUNS_RUNNUMBER"])
3127 ## file_name = str(attrs["FILES_LOGICALFILENAME"])
3128 ## nevents = int(attrs["FILES_NUMBEROFEVENTS"])
3129 ## # I know, this is a bit of a kludge.
3130 ## if not files_info.has_key(run_number):
3131 ## # New run.
3132 ## files_info[run_number] = {}
3133 ## files_info[run_number][file_name] = (nevents,
3134 ## [site_name])
3135 ## elif not files_info[run_number].has_key(file_name):
3136 ## # New file for a known run.
3137 ## files_info[run_number][file_name] = (nevents,
3138 ## [site_name])
3139 ## else:
3140 ## # New entry for a known file for a known run.
3141 ## # DEBUG DEBUG DEBUG
3142 ## # Each file should have the same number of
3143 ## # events independent of the site it's at.
3144 ## assert nevents == files_info[run_number][file_name][0]
3145 ## # DEBUG DEBUG DEBUG end
3146 ## files_info[run_number][file_name][1].append(site_name)
3147  # OBSOLETE OBSOLETE OBSOLETE end
3148  xml.sax.parseString(api_result, handler)
3149  except SAXParseException:
3150  msg = "ERROR: Could not parse DBS server output"
3151  self.logger.fatal(msg)
3152  raise Error(msg)
3153 
3154  # DEBUG DEBUG DEBUG
3155  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
3156  # DEBUG DEBUG DEBUG end
3157 
3158  # Now reshuffle all results a bit so we can more easily use
3159  # them later on. (Remember that all arrays in the results
3160  # should have equal length.)
3161  files_info = {}
3162  for (index, site_name) in enumerate(handler.results["site"]):
3163  # Ugly hack to get around cases like this:
3164  # $ dbs search --query="find dataset, site, file.count where dataset=/RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO"
3165  # Using DBS instance at: http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet
3166  # Processing ... \
3167  # PATH STORAGEELEMENT_SENAME COUNT_FILES
3168  # _________________________________________________________________________________
3169  # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO 1
3170  # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO cmssrm.fnal.gov 12
3171  # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO srm-cms.cern.ch 12
3172  if len(site_name) < 1:
3173  continue
3174  run_number = int(handler.results["run.number"][index])
3175  file_name = handler.results["file.name"][index]
3176  nevents = int(handler.results["file.numevents"][index])
3177 
3178  # I know, this is a bit of a kludge.
3179  if run_number not in files_info:
3180  # New run.
3181  files_info[run_number] = {}
3182  files_info[run_number][file_name] = (nevents,
3183  [site_name])
3184  elif file_name not in files_info[run_number]:
3185  # New file for a known run.
3186  files_info[run_number][file_name] = (nevents,
3187  [site_name])
3188  else:
3189  # New entry for a known file for a known run.
3190  # DEBUG DEBUG DEBUG
3191  # Each file should have the same number of
3192  # events independent of the site it's at.
3193  assert nevents == files_info[run_number][file_name][0]
3194  # DEBUG DEBUG DEBUG end
3195  files_info[run_number][file_name][1].append(site_name)
3196 
3197  # Remove any information for files that are not available
3198  # anywhere. NOTE: After introducing the ugly hack above, this
3199  # is a bit redundant, but let's keep it for the moment.
3200  for run_number in files_info.keys():
3201  files_without_sites = [i for (i, j) in \
3202  files_info[run_number].items() \
3203  if len(j[1]) < 1]
3204  if len(files_without_sites) > 0:
3205  self.logger.warning("Removing %d file(s)" \
3206  " with empty site names" % \
3207  len(files_without_sites))
3208  for file_name in files_without_sites:
3209  del files_info[run_number][file_name]
3210  # files_info[run_number][file_name] = (files_info \
3211  # [run_number] \
3212  # [file_name][0], [])
3213 
3214  # And another bit of a kludge.
3215  num_events_catalog = {}
3216  for run_number in files_info.keys():
3217  site_names = list(set([j for i in files_info[run_number].values() for j in i[1]]))
3218 
3219  # NOTE: The term `mirrored' does not have the usual
3220  # meaning here. It basically means that we can apply
3221  # single-step harvesting.
3222  mirrored = None
3223  if len(site_names) > 1:
3224  # Now we somehow need to figure out if we're dealing
3225  # with a mirrored or a spread-out dataset. The rule we
3226  # use here is that we're dealing with a spread-out
3227  # dataset unless we can find at least one site
3228  # containing exactly the full list of files for this
3229  # dataset that DBS knows about. In that case we just
3230  # use only that site.
3231  all_file_names = files_info[run_number].keys()
3232  all_file_names = set(all_file_names)
3233  sites_with_complete_copies = []
3234  for site_name in site_names:
3235  files_at_site = [i for (i, (j, k)) \
3236  in files_info[run_number].items() \
3237  if site_name in k]
3238  files_at_site = set(files_at_site)
3239  if files_at_site == all_file_names:
3240  sites_with_complete_copies.append(site_name)
3241  if len(sites_with_complete_copies) < 1:
3242  # This dataset/run is available at more than one
3243  # site, but no one has a complete copy. So this is
3244  # a spread-out sample.
3245  mirrored = False
3246  else:
3247  if len(sites_with_complete_copies) > 1:
3248  # This sample is available (and complete) at
3249  # more than one site. Definitely mirrored.
3250  mirrored = True
3251  else:
3252  # This dataset/run is available at more than
3253  # one site and at least one of them has a
3254  # complete copy. Even if this is only a single
3255  # site, let's call this `mirrored' and run the
3256  # single-step harvesting.
3257  mirrored = True
3258 
3259 ## site_names_ref = set(files_info[run_number].values()[0][1])
3260 ## for site_names_tmp in files_info[run_number].values()[1:]:
3261 ## if set(site_names_tmp[1]) != site_names_ref:
3262 ## mirrored = False
3263 ## break
3264 
3265  if mirrored:
3266  self.logger.debug(" -> run appears to be `mirrored'")
3267  else:
3268  self.logger.debug(" -> run appears to be spread-out")
3269 
3270  if mirrored and \
3271  len(sites_with_complete_copies) != len(site_names):
3272  # Remove any references to incomplete sites if we
3273  # have at least one complete site (and if there
3274  # are incomplete sites).
3275  for (file_name, (i, sites)) in files_info[run_number].items():
3276  complete_sites = [site for site in sites \
3277  if site in sites_with_complete_copies]
3278  files_info[run_number][file_name] = (i, complete_sites)
3279  site_names = sites_with_complete_copies
3280 
3281  self.logger.debug(" for run #%d:" % run_number)
3282  num_events_catalog[run_number] = {}
3283  num_events_catalog[run_number]["all_sites"] = sum([i[0] for i in files_info[run_number].values()])
3284  if len(site_names) < 1:
3285  self.logger.debug(" run is not available at any site")
3286  self.logger.debug(" (but should contain %d events" % \
3287  num_events_catalog[run_number]["all_sites"])
3288  else:
3289  self.logger.debug(" at all sites combined there are %d events" % \
3290  num_events_catalog[run_number]["all_sites"])
3291  for site_name in site_names:
3292  num_events_catalog[run_number][site_name] = sum([i[0] for i in files_info[run_number].values() if site_name in i[1]])
3293  self.logger.debug(" at site `%s' there are %d events" % \
3294  (site_name, num_events_catalog[run_number][site_name]))
3295  num_events_catalog[run_number]["mirrored"] = mirrored
3296 
3297  # End of dbs_check_dataset_spread.
3298  return num_events_catalog
3299 
3300  # Beginning of old version.
3301 ## def dbs_check_dataset_num_events(self, dataset_name):
3302 ## """Figure out the number of events in each run of this dataset.
3303 
3304 ## This is a more efficient way of doing this than calling
3305 ## dbs_resolve_number_of_events for each run.
3306 
3307 ## # BUG BUG BUG
3308 ## # This might very well not work at all for spread-out samples. (?)
3309 ## # BUG BUG BUG end
3310 
3311 ## """
3312 
3313 ## # DEBUG DEBUG DEBUG
3314 ## # If we get here DBS should have been set up already.
3315 ## assert not self.dbs_api is None
3316 ## # DEBUG DEBUG DEBUG end
3317 
3318 ## api = self.dbs_api
3319 ## dbs_query = "find run.number, file.name, file.numevents where dataset = %s " \
3320 ## "and dataset.status = VALID" % \
3321 ## dataset_name
3322 ## try:
3323 ## api_result = api.executeQuery(dbs_query)
3324 ## except DbsApiException:
3325 ## msg = "ERROR: Could not execute DBS query"
3326 ## self.logger.fatal(msg)
3327 ## raise Error(msg)
3328 
3329 ## try:
3330 ## files_info = {}
3331 ## class Handler(xml.sax.handler.ContentHandler):
3332 ## def startElement(self, name, attrs):
3333 ## if name == "result":
3334 ## run_number = int(attrs["RUNS_RUNNUMBER"])
3335 ## file_name = str(attrs["FILES_LOGICALFILENAME"])
3336 ## nevents = int(attrs["FILES_NUMBEROFEVENTS"])
3337 ## try:
3338 ## files_info[run_number][file_name] = nevents
3339 ## except KeyError:
3340 ## files_info[run_number] = {file_name: nevents}
3341 ## xml.sax.parseString(api_result, Handler())
3342 ## except SAXParseException:
3343 ## msg = "ERROR: Could not parse DBS server output"
3344 ## self.logger.fatal(msg)
3345 ## raise Error(msg)
3346 
3347 ## num_events_catalog = {}
3348 ## for run_number in files_info.keys():
3349 ## num_events_catalog[run_number] = sum(files_info[run_number].values())
3350 
3351 ## # End of dbs_check_dataset_num_events.
3352 ## return num_events_catalog
3353  # End of old version.
3354 
3355  ##########
3356 
3357  def build_dataset_list(self, input_method, input_name):
3358  """Build a list of all datasets to be processed.
3359 
3360  """
3361 
3362  dataset_names = []
3363 
3364  # It may be, but only for the list of datasets to ignore, that
3365  # the input method and name are None because nothing was
3366  # specified. In that case just an empty list is returned.
3367  if input_method is None:
3368  pass
3369  elif input_method == "dataset":
3370  # Input comes from a dataset name directly on the command
3371  # line. But, this can also contain wildcards so we need
3372  # DBS to translate it conclusively into a list of explicit
3373  # dataset names.
3374  self.logger.info("Asking DBS for dataset names")
3375  dataset_names = self.dbs_resolve_dataset_name(input_name)
3376  elif input_method == "datasetfile":
3377  # In this case a file containing a list of dataset names
3378  # is specified. Still, each line may contain wildcards so
3379  # this step also needs help from DBS.
3380  # NOTE: Lines starting with a `#' are ignored.
3381  self.logger.info("Reading input from list file `%s'" % \
3382  input_name)
3383  try:
3384  listfile = open("/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/bin/%s" %input_name, "r")
3385  print("open listfile")
3386  for dataset in listfile:
3387  # Skip empty lines.
3388  dataset_stripped = dataset.strip()
3389  if len(dataset_stripped) < 1:
3390  continue
3391  # Skip lines starting with a `#'.
3392  if dataset_stripped[0] != "#":
3393  dataset_names.extend(self. \
3394  dbs_resolve_dataset_name(dataset_stripped))
3395  listfile.close()
3396  except IOError:
3397  msg = "ERROR: Could not open input list file `%s'" % \
3398  input_name
3399  self.logger.fatal(msg)
3400  raise Error(msg)
3401  else:
3402  # DEBUG DEBUG DEBUG
3403  # We should never get here.
3404  assert False, "Unknown input method `%s'" % input_method
3405  # DEBUG DEBUG DEBUG end
3406 
3407  # Remove duplicates from the dataset list.
3408  # NOTE: There should not be any duplicates in any list coming
3409  # from DBS, but maybe the user provided a list file with less
3410  # care.
3411  # Store for later use.
3412  dataset_names = sorted(set(dataset_names))
3413 
3414 
3415  # End of build_dataset_list.
3416  return dataset_names
3417 
3418  ##########
3419 
3421  """Build a list of datasets to process.
3422 
3423  """
3424 
3425  self.logger.info("Building list of datasets to consider...")
3426 
3427  input_method = self.input_method["datasets"]["use"]
3428  input_name = self.input_name["datasets"]["use"]
3429  dataset_names = self.build_dataset_list(input_method,
3430  input_name)
3431  self.datasets_to_use = dict(list(zip(dataset_names,
3432  [None] * len(dataset_names))))
3433 
3434  self.logger.info(" found %d dataset(s) to process:" % \
3435  len(dataset_names))
3436  for dataset in dataset_names:
3437  self.logger.info(" `%s'" % dataset)
3438 
3439  # End of build_dataset_use_list.
3440 
3441  ##########
3442 
3444  """Build a list of datasets to ignore.
3445 
3446  NOTE: We should always have a list of datasets to process, but
3447  it may be that we don't have a list of datasets to ignore.
3448 
3449  """
3450 
3451  self.logger.info("Building list of datasets to ignore...")
3452 
3453  input_method = self.input_method["datasets"]["ignore"]
3454  input_name = self.input_name["datasets"]["ignore"]
3455  dataset_names = self.build_dataset_list(input_method,
3456  input_name)
3457  self.datasets_to_ignore = dict(list(zip(dataset_names,
3458  [None] * len(dataset_names))))
3459 
3460  self.logger.info(" found %d dataset(s) to ignore:" % \
3461  len(dataset_names))
3462  for dataset in dataset_names:
3463  self.logger.info(" `%s'" % dataset)
3464 
3465  # End of build_dataset_ignore_list.
3466 
3467  ##########
3468 
3469  def build_runs_list(self, input_method, input_name):
3470 
3471  runs = []
3472 
3473  # A list of runs (either to use or to ignore) is not
3474  # required. This protects against `empty cases.'
3475  if input_method is None:
3476  pass
3477  elif input_method == "runs":
3478  # A list of runs was specified directly from the command
3479  # line.
3480  self.logger.info("Reading list of runs from the " \
3481  "command line")
3482  runs.extend([int(i.strip()) \
3483  for i in input_name.split(",") \
3484  if len(i.strip()) > 0])
3485  elif input_method == "runslistfile":
3486  # We were passed a file containing a list of runs.
3487  self.logger.info("Reading list of runs from file `%s'" % \
3488  input_name)
3489  try:
3490  listfile = open(input_name, "r")
3491  for run in listfile:
3492  # Skip empty lines.
3493  run_stripped = run.strip()
3494  if len(run_stripped) < 1:
3495  continue
3496  # Skip lines starting with a `#'.
3497  if run_stripped[0] != "#":
3498  runs.append(int(run_stripped))
3499  listfile.close()
3500  except IOError:
3501  msg = "ERROR: Could not open input list file `%s'" % \
3502  input_name
3503  self.logger.fatal(msg)
3504  raise Error(msg)
3505 
3506  else:
3507  # DEBUG DEBUG DEBUG
3508  # We should never get here.
3509  assert False, "Unknown input method `%s'" % input_method
3510  # DEBUG DEBUG DEBUG end
3511 
3512  # Remove duplicates, sort and done.
3513  runs = list(set(runs))
3514 
3515  # End of build_runs_list().
3516  return runs
3517 
3518  ##########
3519 
3521  """Build a list of runs to process.
3522 
3523  """
3524 
3525  self.logger.info("Building list of runs to consider...")
3526 
3527  input_method = self.input_method["runs"]["use"]
3528  input_name = self.input_name["runs"]["use"]
3529  runs = self.build_runs_list(input_method, input_name)
3530  self.runs_to_use = dict(list(zip(runs, [None] * len(runs))))
3531 
3532  self.logger.info(" found %d run(s) to process:" % \
3533  len(runs))
3534  if len(runs) > 0:
3535  self.logger.info(" %s" % ", ".join([str(i) for i in runs]))
3536 
3537  # End of build_runs_list().
3538 
3539  ##########
3540 
3542  """Build a list of runs to ignore.
3543 
3544  NOTE: We should always have a list of runs to process, but
3545  it may be that we don't have a list of runs to ignore.
3546 
3547  """
3548 
3549  self.logger.info("Building list of runs to ignore...")
3550 
3551  input_method = self.input_method["runs"]["ignore"]
3552  input_name = self.input_name["runs"]["ignore"]
3553  runs = self.build_runs_list(input_method, input_name)
3554  self.runs_to_ignore = dict(list(zip(runs, [None] * len(runs))))
3555 
3556  self.logger.info(" found %d run(s) to ignore:" % \
3557  len(runs))
3558  if len(runs) > 0:
3559  self.logger.info(" %s" % ", ".join([str(i) for i in runs]))
3560 
3561  # End of build_runs_ignore_list().
3562 
3563  ##########
3564 
3566  """Update the list of datasets taking into account the ones to
3567  ignore.
3568 
3569  Both lists have been generated before from DBS and both are
3570  assumed to be unique.
3571 
3572  NOTE: The advantage of creating the ignore list from DBS (in
3573  case a regexp is given) and matching that instead of directly
3574  matching the ignore criterion against the list of datasets (to
3575  consider) built from DBS is that in the former case we're sure
3576  that all regexps are treated exactly as DBS would have done
3577  without the cmsHarvester.
3578 
3579  NOTE: This only removes complete samples. Exclusion of single
3580  runs is done by the book keeping. So the assumption is that a
3581  user never wants to harvest just part (i.e. n out of N runs)
3582  of a sample.
3583 
3584  """
3585 
3586  self.logger.info("Processing list of datasets to ignore...")
3587 
3588  self.logger.debug("Before processing ignore list there are %d " \
3589  "datasets in the list to be processed" % \
3590  len(self.datasets_to_use))
3591 
3592  # Simple approach: just loop and search.
3593  dataset_names_filtered = copy.deepcopy(self.datasets_to_use)
3594  for dataset_name in self.datasets_to_use.keys():
3595  if dataset_name in self.datasets_to_ignore.keys():
3596  del dataset_names_filtered[dataset_name]
3597 
3598  self.logger.info(" --> Removed %d dataset(s)" % \
3599  (len(self.datasets_to_use) -
3600  len(dataset_names_filtered)))
3601 
3602  self.datasets_to_use = dataset_names_filtered
3603 
3604  self.logger.debug("After processing ignore list there are %d " \
3605  "datasets in the list to be processed" % \
3606  len(self.datasets_to_use))
3607 
3608  # End of process_dataset_ignore_list.
3609 
3610  ##########
3611 
3613 
3614  self.logger.info("Processing list of runs to use and ignore...")
3615 
3616  # This basically adds all runs in a dataset to be processed,
3617  # except for any runs that are not specified in the `to use'
3618  # list and any runs that are specified in the `to ignore'
3619  # list.
3620 
3621  # NOTE: It is assumed that those lists make sense. The input
3622  # should be checked against e.g. overlapping `use' and
3623  # `ignore' lists.
3624 
3625  runs_to_use = self.runs_to_use
3626  runs_to_ignore = self.runs_to_ignore
3627 
3628  for dataset_name in self.datasets_to_use:
3629  runs_in_dataset = self.datasets_information[dataset_name]["runs"]
3630 
3631  # First some sanity checks.
3632  runs_to_use_tmp = []
3633  for run in runs_to_use:
3634  if not run in runs_in_dataset:
3635  self.logger.warning("Dataset `%s' does not contain " \
3636  "requested run %d " \
3637  "--> ignoring `use' of this run" % \
3638  (dataset_name, run))
3639  else:
3640  runs_to_use_tmp.append(run)
3641 
3642  if len(runs_to_use) > 0:
3643  runs = runs_to_use_tmp
3644  self.logger.info("Using %d out of %d runs " \
3645  "of dataset `%s'" % \
3646  (len(runs), len(runs_in_dataset),
3647  dataset_name))
3648  else:
3649  runs = runs_in_dataset
3650 
3651  if len(runs_to_ignore) > 0:
3652  runs_tmp = []
3653  for run in runs:
3654  if not run in runs_to_ignore:
3655  runs_tmp.append(run)
3656  self.logger.info("Ignoring %d out of %d runs " \
3657  "of dataset `%s'" % \
3658  (len(runs)- len(runs_tmp),
3659  len(runs_in_dataset),
3660  dataset_name))
3661  runs = runs_tmp
3662 
3663  if self.todofile != "YourToDofile.txt":
3664  runs_todo = []
3665  print("Reading runs from file /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s" %self.todofile)
3666  cmd="grep %s /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s | cut -f5 -d' '" %(dataset_name,self.todofile)
3667  (status, output)=commands.getstatusoutput(cmd)
3668  for run in runs:
3669  run_str="%s" %run
3670  if run_str in output:
3671  runs_todo.append(run)
3672  self.logger.info("Using %d runs " \
3673  "of dataset `%s'" % \
3674  (len(runs_todo),
3675  dataset_name))
3676  runs=runs_todo
3677 
3678  Json_runs = []
3679  if self.Jsonfilename != "YourJSON.txt":
3680  good_runs = []
3681  self.Jsonlumi = True
3682  # We were passed a Jsonfile containing a dictionary of
3683  # run/lunisection-pairs
3684  self.logger.info("Reading runs and lumisections from file `%s'" % \
3685  self.Jsonfilename)
3686  try:
3687  Jsonfile = open(self.Jsonfilename, "r")
3688  for names in Jsonfile:
3689  dictNames= eval(str(names))
3690  for key in dictNames:
3691  intkey=int(key)
3692  Json_runs.append(intkey)
3693  Jsonfile.close()
3694  except IOError:
3695  msg = "ERROR: Could not open Jsonfile `%s'" % \
3696  input_name
3697  self.logger.fatal(msg)
3698  raise Error(msg)
3699  for run in runs:
3700  if run in Json_runs:
3701  good_runs.append(run)
3702  self.logger.info("Using %d runs " \
3703  "of dataset `%s'" % \
3704  (len(good_runs),
3705  dataset_name))
3706  runs=good_runs
3707  if (self.Jsonrunfilename != "YourJSON.txt") and (self.Jsonfilename == "YourJSON.txt"):
3708  good_runs = []
3709  # We were passed a Jsonfile containing a dictionary of
3710  # run/lunisection-pairs
3711  self.logger.info("Reading runs from file `%s'" % \
3712  self.Jsonrunfilename)
3713  try:
3714  Jsonfile = open(self.Jsonrunfilename, "r")
3715  for names in Jsonfile:
3716  dictNames= eval(str(names))
3717  for key in dictNames:
3718  intkey=int(key)
3719  Json_runs.append(intkey)
3720  Jsonfile.close()
3721  except IOError:
3722  msg = "ERROR: Could not open Jsonfile `%s'" % \
3723  input_name
3724  self.logger.fatal(msg)
3725  raise Error(msg)
3726  for run in runs:
3727  if run in Json_runs:
3728  good_runs.append(run)
3729  self.logger.info("Using %d runs " \
3730  "of dataset `%s'" % \
3731  (len(good_runs),
3732  dataset_name))
3733  runs=good_runs
3734 
3735  self.datasets_to_use[dataset_name] = runs
3736 
3737  # End of process_runs_use_and_ignore_lists().
3738 
3739  ##########
3740 
3742  """Remove all but the largest part of all datasets.
3743 
3744  This allows us to harvest at least part of these datasets
3745  using single-step harvesting until the two-step approach
3746  works.
3747 
3748  """
3749 
3750  # DEBUG DEBUG DEBUG
3751  assert self.harvesting_mode == "single-step-allow-partial"
3752  # DEBUG DEBUG DEBUG end
3753 
3754  for dataset_name in self.datasets_to_use:
3755  for run_number in self.datasets_information[dataset_name]["runs"]:
3756  max_events = max(self.datasets_information[dataset_name]["sites"][run_number].values())
3757  sites_with_max_events = [i[0] for i in self.datasets_information[dataset_name]["sites"][run_number].items() if i[1] == max_events]
3758  self.logger.warning("Singlifying dataset `%s', " \
3759  "run %d" % \
3760  (dataset_name, run_number))
3761  cmssw_version = self.datasets_information[dataset_name] \
3762  ["cmssw_version"]
3763  selected_site = self.pick_a_site(sites_with_max_events,
3764  cmssw_version)
3765 
3766  # Let's tell the user that we're manhandling this dataset.
3767  nevents_old = self.datasets_information[dataset_name]["num_events"][run_number]
3768  self.logger.warning(" --> " \
3769  "only harvesting partial statistics: " \
3770  "%d out of %d events (5.1%f%%) " \
3771  "at site `%s'" % \
3772  (max_events,
3773  nevents_old,
3774  100. * max_events / nevents_old,
3775  selected_site))
3776  self.logger.warning("!!! Please note that the number of " \
3777  "events in the output path name will " \
3778  "NOT reflect the actual statistics in " \
3779  "the harvested results !!!")
3780 
3781  # We found the site with the highest statistics and
3782  # the corresponding number of events. (CRAB gets upset
3783  # if we ask for more events than there are at a given
3784  # site.) Now update this information in our main
3785  # datasets_information variable.
3786  self.datasets_information[dataset_name]["sites"][run_number] = {selected_site: max_events}
3787  self.datasets_information[dataset_name]["num_events"][run_number] = max_events
3788  #self.datasets_information[dataset_name]["sites"][run_number] = [selected_site]
3789 
3790  # End of singlify_datasets.
3791 
3792  ##########
3793 
3795  """Check list of dataset names for impossible ones.
3796 
3797  Two kinds of checks are done:
3798  - Checks for things that do not make sense. These lead to
3799  errors and skipped datasets.
3800  - Sanity checks. For these warnings are issued but the user is
3801  considered to be the authoritative expert.
3802 
3803  Checks performed:
3804  - The CMSSW version encoded in the dataset name should match
3805  self.cmssw_version. This is critical.
3806  - There should be some events in the dataset/run. This is
3807  critical in the sense that CRAB refuses to create jobs for
3808  zero events. And yes, this does happen in practice. E.g. the
3809  reprocessed CRAFT08 datasets contain runs with zero events.
3810  - A cursory check is performed to see if the harvesting type
3811  makes sense for the data type. This should prevent the user
3812  from inadvertently running RelVal for data.
3813  - It is not possible to run single-step harvesting jobs on
3814  samples that are not fully contained at a single site.
3815  - Each dataset/run has to be available at at least one site.
3816 
3817  """
3818 
3819  self.logger.info("Performing sanity checks on dataset list...")
3820 
3821  dataset_names_after_checks = copy.deepcopy(self.datasets_to_use)
3822 
3823  for dataset_name in self.datasets_to_use.keys():
3824 
3825  # Check CMSSW version.
3826  version_from_dataset = self.datasets_information[dataset_name] \
3827  ["cmssw_version"]
3828  if version_from_dataset != self.cmssw_version:
3829  msg = " CMSSW version mismatch for dataset `%s' " \
3830  "(%s vs. %s)" % \
3831  (dataset_name,
3832  self.cmssw_version, version_from_dataset)
3833  if self.force_running:
3834  # Expert mode: just warn, then continue.
3835  self.logger.warning("%s " \
3836  "--> `force mode' active: " \
3837  "run anyway" % msg)
3838  else:
3839  del dataset_names_after_checks[dataset_name]
3840  self.logger.warning("%s " \
3841  "--> skipping" % msg)
3842  continue
3843 
3844  ###
3845 
3846  # Check that the harvesting type makes sense for the
3847  # sample. E.g. normally one would not run the DQMOffline
3848  # harvesting on Monte Carlo.
3849  # TODO TODO TODO
3850  # This should be further refined.
3851  suspicious = False
3852  datatype = self.datasets_information[dataset_name]["datatype"]
3853  if datatype == "data":
3854  # Normally only DQM harvesting is run on data.
3855  if self.harvesting_type != "DQMOffline":
3856  suspicious = True
3857  elif datatype == "mc":
3858  if self.harvesting_type == "DQMOffline":
3859  suspicious = True
3860  else:
3861  # Doh!
3862  assert False, "ERROR Impossible data type `%s' " \
3863  "for dataset `%s'" % \
3864  (datatype, dataset_name)
3865  if suspicious:
3866  msg = " Normally one does not run `%s' harvesting " \
3867  "on %s samples, are you sure?" % \
3868  (self.harvesting_type, datatype)
3869  if self.force_running:
3870  self.logger.warning("%s " \
3871  "--> `force mode' active: " \
3872  "run anyway" % msg)
3873  else:
3874  del dataset_names_after_checks[dataset_name]
3875  self.logger.warning("%s " \
3876  "--> skipping" % msg)
3877  continue
3878 
3879  # TODO TODO TODO end
3880 
3881  ###
3882 
3883  # BUG BUG BUG
3884  # For the moment, due to a problem with DBS, I cannot
3885  # figure out the GlobalTag for data by myself. (For MC
3886  # it's no problem.) This means that unless a GlobalTag was
3887  # specified from the command line, we will have to skip
3888  # any data datasets.
3889 
3890  if datatype == "data":
3891  if self.globaltag is None:
3892  msg = "For data datasets (like `%s') " \
3893  "we need a GlobalTag" % \
3894  dataset_name
3895  del dataset_names_after_checks[dataset_name]
3896  self.logger.warning("%s " \
3897  "--> skipping" % msg)
3898  continue
3899 
3900  # BUG BUG BUG end
3901 
3902  ###
3903 
3904  # Check if the GlobalTag exists and (if we're using
3905  # reference histograms) if it's ready to be used with
3906  # reference histograms.
3907  globaltag = self.datasets_information[dataset_name]["globaltag"]
3908  if not globaltag in self.globaltag_check_cache:
3909  if self.check_globaltag(globaltag):
3910  self.globaltag_check_cache.append(globaltag)
3911  else:
3912  msg = "Something is wrong with GlobalTag `%s' " \
3913  "used by dataset `%s'!" % \
3914  (globaltag, dataset_name)
3915  if self.use_ref_hists:
3916  msg += "\n(Either it does not exist or it " \
3917  "does not contain the required key to " \
3918  "be used with reference histograms.)"
3919  else:
3920  msg += "\n(It probably just does not exist.)"
3921  self.logger.fatal(msg)
3922  raise Usage(msg)
3923 
3924  ###
3925 
3926  # Require that each run is available at least somewhere.
3927  runs_without_sites = [i for (i, j) in \
3928  self.datasets_information[dataset_name] \
3929  ["sites"].items() \
3930  if len(j) < 1 and \
3931  i in self.datasets_to_use[dataset_name]]
3932  if len(runs_without_sites) > 0:
3933  for run_without_sites in runs_without_sites:
3934  try:
3935  dataset_names_after_checks[dataset_name].remove(run_without_sites)
3936  except KeyError:
3937  pass
3938  self.logger.warning(" removed %d unavailable run(s) " \
3939  "from dataset `%s'" % \
3940  (len(runs_without_sites), dataset_name))
3941  self.logger.debug(" (%s)" % \
3942  ", ".join([str(i) for i in \
3943  runs_without_sites]))
3944 
3945  ###
3946 
3947  # Unless we're running two-step harvesting: only allow
3948  # samples located on a single site.
3949  if not self.harvesting_mode == "two-step":
3950  for run_number in self.datasets_to_use[dataset_name]:
3951  # DEBUG DEBUG DEBUG
3952 ## if self.datasets_information[dataset_name]["num_events"][run_number] != 0:
3953 ## pdb.set_trace()
3954  # DEBUG DEBUG DEBUG end
3955  num_sites = len(self.datasets_information[dataset_name] \
3956  ["sites"][run_number])
3957  if num_sites > 1 and \
3958  not self.datasets_information[dataset_name] \
3959  ["mirrored"][run_number]:
3960  # Cannot do this with a single-step job, not
3961  # even in force mode. It just does not make
3962  # sense.
3963  msg = " Dataset `%s', run %d is spread across more " \
3964  "than one site.\n" \
3965  " Cannot run single-step harvesting on " \
3966  "samples spread across multiple sites" % \
3967  (dataset_name, run_number)
3968  try:
3969  dataset_names_after_checks[dataset_name].remove(run_number)
3970  except KeyError:
3971  pass
3972  self.logger.warning("%s " \
3973  "--> skipping" % msg)
3974 
3975  ###
3976 
3977  # Require that the dataset/run is non-empty.
3978  # NOTE: To avoid reconsidering empty runs/datasets next
3979  # time around, we do include them in the book keeping.
3980  # BUG BUG BUG
3981  # This should sum only over the runs that we use!
3982  tmp = [j for (i, j) in self.datasets_information \
3983  [dataset_name]["num_events"].items() \
3984  if i in self.datasets_to_use[dataset_name]]
3985  num_events_dataset = sum(tmp)
3986  # BUG BUG BUG end
3987  if num_events_dataset < 1:
3988  msg = " dataset `%s' is empty" % dataset_name
3989  del dataset_names_after_checks[dataset_name]
3990  self.logger.warning("%s " \
3991  "--> skipping" % msg)
3992  # Update the book keeping with all the runs in the dataset.
3993  # DEBUG DEBUG DEBUG
3994  #assert set([j for (i, j) in self.datasets_information \
3995  # [dataset_name]["num_events"].items() \
3996  # if i in self.datasets_to_use[dataset_name]]) == \
3997  # set([0])
3998  # DEBUG DEBUG DEBUG end
3999  #self.book_keeping_information[dataset_name] = self.datasets_information \
4000  # [dataset_name]["num_events"]
4001  continue
4002 
4003  tmp = [i for i in \
4004  self.datasets_information[dataset_name] \
4005  ["num_events"].items() if i[1] < 1]
4006  tmp = [i for i in tmp if i[0] in self.datasets_to_use[dataset_name]]
4007  empty_runs = dict(tmp)
4008  if len(empty_runs) > 0:
4009  for empty_run in empty_runs:
4010  try:
4011  dataset_names_after_checks[dataset_name].remove(empty_run)
4012  except KeyError:
4013  pass
4014  self.logger.info(" removed %d empty run(s) from dataset `%s'" % \
4015  (len(empty_runs), dataset_name))
4016  self.logger.debug(" (%s)" % \
4017  ", ".join([str(i) for i in empty_runs]))
4018 
4019  ###
4020 
4021  # If we emptied out a complete dataset, remove the whole
4022  # thing.
4023  dataset_names_after_checks_tmp = copy.deepcopy(dataset_names_after_checks)
4024  for (dataset_name, runs) in six.iteritems(dataset_names_after_checks):
4025  if len(runs) < 1:
4026  self.logger.warning(" Removing dataset without any runs " \
4027  "(left) `%s'" % \
4028  dataset_name)
4029  del dataset_names_after_checks_tmp[dataset_name]
4030  dataset_names_after_checks = dataset_names_after_checks_tmp
4031 
4032  ###
4033 
4034  self.logger.warning(" --> Removed %d dataset(s)" % \
4035  (len(self.datasets_to_use) -
4036  len(dataset_names_after_checks)))
4037 
4038  # Now store the modified version of the dataset list.
4039  self.datasets_to_use = dataset_names_after_checks
4040 
4041  # End of check_dataset_list.
4042 
4043  ##########
4044 
4045  def escape_dataset_name(self, dataset_name):
4046  """Escape a DBS dataset name.
4047 
4048  Escape a DBS dataset name such that it does not cause trouble
4049  with the file system. This means turning each `/' into `__',
4050  except for the first one which is just removed.
4051 
4052  """
4053 
4054  escaped_dataset_name = dataset_name
4055  escaped_dataset_name = escaped_dataset_name.strip("/")
4056  escaped_dataset_name = escaped_dataset_name.replace("/", "__")
4057 
4058  return escaped_dataset_name
4059 
4060  ##########
4061 
4062  # BUG BUG BUG
4063  # This is a bit of a redundant method, isn't it?
4064  def create_config_file_name(self, dataset_name, run_number):
4065  """Generate the name of the configuration file to be run by
4066  CRAB.
4067 
4068  Depending on the harvesting mode (single-step or two-step)
4069  this is the name of the real harvesting configuration or the
4070  name of the first-step ME summary extraction configuration.
4071 
4072  """
4073 
4074  if self.harvesting_mode == "single-step":
4075  config_file_name = self.create_harvesting_config_file_name(dataset_name)
4076  elif self.harvesting_mode == "single-step-allow-partial":
4077  config_file_name = self.create_harvesting_config_file_name(dataset_name)
4078 ## # Only add the alarming piece to the file name if this is
4079 ## # a spread-out dataset.
4080 ## pdb.set_trace()
4081 ## if self.datasets_information[dataset_name] \
4082 ## ["mirrored"][run_number] == False:
4083 ## config_file_name = config_file_name.replace(".py", "_partial.py")
4084  elif self.harvesting_mode == "two-step":
4085  config_file_name = self.create_me_summary_config_file_name(dataset_name)
4086  else:
4087  assert False, "ERROR Unknown harvesting mode `%s'" % \
4088  self.harvesting_mode
4089 
4090  # End of create_config_file_name.
4091  return config_file_name
4092  # BUG BUG BUG end
4093 
4094  ##########
4095 
4096  def create_harvesting_config_file_name(self, dataset_name):
4097  "Generate the name to be used for the harvesting config file."
4098 
4099  file_name_base = "harvesting.py"
4100  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4101  config_file_name = file_name_base.replace(".py",
4102  "_%s.py" % \
4103  dataset_name_escaped)
4104 
4105  # End of create_harvesting_config_file_name.
4106  return config_file_name
4107 
4108  ##########
4109 
4110  def create_me_summary_config_file_name(self, dataset_name):
4111  "Generate the name of the ME summary extraction config file."
4112 
4113  file_name_base = "me_extraction.py"
4114  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4115  config_file_name = file_name_base.replace(".py",
4116  "_%s.py" % \
4117  dataset_name_escaped)
4118 
4119  # End of create_me_summary_config_file_name.
4120  return config_file_name
4121 
4122  ##########
4123 
4124  def create_output_file_name(self, dataset_name, run_number=None):
4125  """Create the name of the output file name to be used.
4126 
4127  This is the name of the output file of the `first step'. In
4128  the case of single-step harvesting this is already the final
4129  harvesting output ROOT file. In the case of two-step
4130  harvesting it is the name of the intermediary ME summary
4131  file.
4132 
4133  """
4134 
4135  # BUG BUG BUG
4136  # This method has become a bit of a mess. Originally it was
4137  # nice to have one entry point for both single- and two-step
4138  # output file names. However, now the former needs the run
4139  # number, while the latter does not even know about run
4140  # numbers. This should be fixed up a bit.
4141  # BUG BUG BUG end
4142 
4143  if self.harvesting_mode == "single-step":
4144  # DEBUG DEBUG DEBUG
4145  assert not run_number is None
4146  # DEBUG DEBUG DEBUG end
4147  output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4148  elif self.harvesting_mode == "single-step-allow-partial":
4149  # DEBUG DEBUG DEBUG
4150  assert not run_number is None
4151  # DEBUG DEBUG DEBUG end
4152  output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4153  elif self.harvesting_mode == "two-step":
4154  # DEBUG DEBUG DEBUG
4155  assert run_number is None
4156  # DEBUG DEBUG DEBUG end
4157  output_file_name = self.create_me_summary_output_file_name(dataset_name)
4158  else:
4159  # This should not be possible, but hey...
4160  assert False, "ERROR Unknown harvesting mode `%s'" % \
4161  self.harvesting_mode
4162 
4163  # End of create_harvesting_output_file_name.
4164  return output_file_name
4165 
4166  ##########
4167 
4168  def create_harvesting_output_file_name(self, dataset_name, run_number):
4169  """Generate the name to be used for the harvesting output file.
4170 
4171  This harvesting output file is the _final_ ROOT output file
4172  containing the harvesting results. In case of two-step
4173  harvesting there is an intermediate ME output file as well.
4174 
4175  """
4176 
4177  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4178 
4179  # Hmmm, looking at the code for the DQMFileSaver this might
4180  # actually be the place where the first part of this file
4181  # naming scheme comes from.
4182  # NOTE: It looks like the `V0001' comes from the DQM
4183  # version. This is something that cannot be looked up from
4184  # here, so let's hope it does not change too often.
4185  output_file_name = "DQM_V0001_R%09d__%s.root" % \
4186  (run_number, dataset_name_escaped)
4187  if self.harvesting_mode.find("partial") > -1:
4188  # Only add the alarming piece to the file name if this is
4189  # a spread-out dataset.
4190  if self.datasets_information[dataset_name] \
4191  ["mirrored"][run_number] == False:
4192  output_file_name = output_file_name.replace(".root", \
4193  "_partial.root")
4194 
4195  # End of create_harvesting_output_file_name.
4196  return output_file_name
4197 
4198  ##########
4199 
4200  def create_me_summary_output_file_name(self, dataset_name):
4201  """Generate the name of the intermediate ME file name to be
4202  used in two-step harvesting.
4203 
4204  """
4205 
4206  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4207  output_file_name = "me_summary_%s.root" % \
4208  dataset_name_escaped
4209 
4210  # End of create_me_summary_output_file_name.
4211  return output_file_name
4212 
4213  ##########
4214 
4215  def create_multicrab_block_name(self, dataset_name, run_number, index):
4216  """Create the block name to use for this dataset/run number.
4217 
4218  This is what appears in the brackets `[]' in multicrab.cfg. It
4219  is used as the name of the job and to create output
4220  directories.
4221 
4222  """
4223 
4224  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4225  block_name = "%s_%09d_%s" % (dataset_name_escaped, run_number, index)
4226 
4227  # End of create_multicrab_block_name.
4228  return block_name
4229 
4230  ##########
4231 
4233  """Create a CRAB configuration for a given job.
4234 
4235  NOTE: This is _not_ a complete (as in: submittable) CRAB
4236  configuration. It is used to store the common settings for the
4237  multicrab configuration.
4238 
4239  NOTE: Only CERN CASTOR area (/castor/cern.ch/) is supported.
4240 
4241  NOTE: According to CRAB, you `Must define exactly two of
4242  total_number_of_events, events_per_job, or
4243  number_of_jobs.'. For single-step harvesting we force one job,
4244  for the rest we don't really care.
4245 
4246  # BUG BUG BUG
4247  # With the current version of CRAB (2.6.1), in which Daniele
4248  # fixed the behaviour of no_block_boundary for me, one _has to
4249  # specify_ the total_number_of_events and one single site in
4250  # the se_white_list.
4251  # BUG BUG BUG end
4252 
4253  """
4254 
4255  tmp = []
4256 
4257  # This is the stuff we will need to fill in.
4258  castor_prefix = self.castor_prefix
4259 
4260  tmp.append(self.config_file_header())
4261  tmp.append("")
4262 
4263  ## CRAB
4264  ##------
4265  tmp.append("[CRAB]")
4266  tmp.append("jobtype = cmssw")
4267  tmp.append("")
4268 
4269  ## GRID
4270  ##------
4271  tmp.append("[GRID]")
4272  tmp.append("virtual_organization=cms")
4273  tmp.append("")
4274 
4275  ## USER
4276  ##------
4277  tmp.append("[USER]")
4278  tmp.append("copy_data = 1")
4279  tmp.append("")
4280 
4281  ## CMSSW
4282  ##-------
4283  tmp.append("[CMSSW]")
4284  tmp.append("# This reveals data hosted on T1 sites,")
4285  tmp.append("# which is normally hidden by CRAB.")
4286  tmp.append("show_prod = 1")
4287  tmp.append("number_of_jobs = 1")
4288  if self.Jsonlumi == True:
4289  tmp.append("lumi_mask = %s" % self.Jsonfilename)
4290  tmp.append("total_number_of_lumis = -1")
4291  else:
4292  if self.harvesting_type == "DQMOffline":
4293  tmp.append("total_number_of_lumis = -1")
4294  else:
4295  tmp.append("total_number_of_events = -1")
4296  if self.harvesting_mode.find("single-step") > -1:
4297  tmp.append("# Force everything to run in one job.")
4298  tmp.append("no_block_boundary = 1")
4299  tmp.append("")
4300 
4301  ## CAF
4302  ##-----
4303  tmp.append("[CAF]")
4304 
4305  crab_config = "\n".join(tmp)
4306 
4307  # End of create_crab_config.
4308  return crab_config
4309 
4310  ##########
4311 
4313  """Create a multicrab.cfg file for all samples.
4314 
4315  This creates the contents for a multicrab.cfg file that uses
4316  the crab.cfg file (generated elsewhere) for the basic settings
4317  and contains blocks for each run of each dataset.
4318 
4319  # BUG BUG BUG
4320  # The fact that it's necessary to specify the se_white_list
4321  # and the total_number_of_events is due to our use of CRAB
4322  # version 2.6.1. This should no longer be necessary in the
4323  # future.
4324  # BUG BUG BUG end
4325 
4326  """
4327 
4328  cmd="who i am | cut -f1 -d' '"
4329  (status, output)=commands.getstatusoutput(cmd)
4330  UserName = output
4331 
4332  if self.caf_access == True:
4333  print("Extracting %s as user name" %UserName)
4334 
4335  number_max_sites = self.nr_max_sites + 1
4336 
4337  multicrab_config_lines = []
4338  multicrab_config_lines.append(self.config_file_header())
4339  multicrab_config_lines.append("")
4340  multicrab_config_lines.append("[MULTICRAB]")
4341  multicrab_config_lines.append("cfg = crab.cfg")
4342  multicrab_config_lines.append("")
4343 
4344  dataset_names = sorted(self.datasets_to_use.keys())
4345 
4346  for dataset_name in dataset_names:
4347  runs = self.datasets_to_use[dataset_name]
4348  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4349  castor_prefix = self.castor_prefix
4350 
4351  for run in runs:
4352 
4353  # CASTOR output dir.
4354  castor_dir = self.datasets_information[dataset_name] \
4355  ["castor_path"][run]
4356 
4357  cmd = "rfdir %s" % castor_dir
4358  (status, output) = commands.getstatusoutput(cmd)
4359 
4360  if len(output) <= 0:
4361 
4362  # DEBUG DEBUG DEBUG
4363  # We should only get here if we're treating a
4364  # dataset/run that is fully contained at a single
4365  # site.
4366  assert (len(self.datasets_information[dataset_name] \
4367  ["sites"][run]) == 1) or \
4368  self.datasets_information[dataset_name]["mirrored"]
4369  # DEBUG DEBUG DEBUG end
4370 
4371  site_names = self.datasets_information[dataset_name] \
4372  ["sites"][run].keys()
4373 
4374  for i in range(1, number_max_sites, 1):
4375  if len(site_names) > 0:
4376  index = "site_%02d" % (i)
4377 
4378  config_file_name = self. \
4379  create_config_file_name(dataset_name, run)
4380  output_file_name = self. \
4381  create_output_file_name(dataset_name, run)
4382 
4383 
4384  # If we're looking at a mirrored dataset we just pick
4385  # one of the sites. Otherwise there is nothing to
4386  # choose.
4387 
4388  # Loop variable
4389  loop = 0
4390 
4391  if len(site_names) > 1:
4392  cmssw_version = self.datasets_information[dataset_name] \
4393  ["cmssw_version"]
4394  self.logger.info("Picking site for mirrored dataset " \
4395  "`%s', run %d" % \
4396  (dataset_name, run))
4397  site_name = self.pick_a_site(site_names, cmssw_version)
4398  if site_name in site_names:
4399  site_names.remove(site_name)
4400 
4401  else:
4402  site_name = site_names[0]
4403  site_names.remove(site_name)
4404 
4405  if site_name is self.no_matching_site_found_str:
4406  if loop < 1:
4407  break
4408 
4409  nevents = self.datasets_information[dataset_name]["num_events"][run]
4410 
4411  # The block name.
4412  multicrab_block_name = self.create_multicrab_block_name( \
4413  dataset_name, run, index)
4414  multicrab_config_lines.append("[%s]" % \
4415  multicrab_block_name)
4416 
4417  ## CRAB
4418  ##------
4419  if site_name == "caf.cern.ch":
4420  multicrab_config_lines.append("CRAB.use_server=0")
4421  multicrab_config_lines.append("CRAB.scheduler=caf")
4422  else:
4423  multicrab_config_lines.append("scheduler = glite")
4424 
4425  ## GRID
4426  ##------
4427  if site_name == "caf.cern.ch":
4428  pass
4429  else:
4430  multicrab_config_lines.append("GRID.se_white_list = %s" % \
4431  site_name)
4432  multicrab_config_lines.append("# This removes the default blacklisting of T1 sites.")
4433  multicrab_config_lines.append("GRID.remove_default_blacklist = 1")
4434  multicrab_config_lines.append("GRID.rb = CERN")
4435  if not self.non_t1access:
4436  multicrab_config_lines.append("GRID.role = t1access")
4437 
4438  ## USER
4439  ##------
4440 
4441  castor_dir = castor_dir.replace(castor_prefix, "")
4442  multicrab_config_lines.append("USER.storage_element=srm-cms.cern.ch")
4443  multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4444  castor_dir)
4445  multicrab_config_lines.append("USER.check_user_remote_dir=0")
4446 
4447  if site_name == "caf.cern.ch":
4448  multicrab_config_lines.append("USER.storage_path=%s" % castor_prefix)
4449  #multicrab_config_lines.append("USER.storage_element=T2_CH_CAF")
4450  #castor_dir = castor_dir.replace("/cms/store/caf/user/%s" %UserName, "")
4451  #multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4452  # castor_dir)
4453  else:
4454  multicrab_config_lines.append("USER.storage_path=/srm/managerv2?SFN=%s" % castor_prefix)
4455  #multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4456  # castor_dir)
4457  #multicrab_config_lines.append("USER.storage_element=srm-cms.cern.ch")
4458 
4459  ## CMSSW
4460  ##-------
4461  multicrab_config_lines.append("CMSSW.pset = %s" % \
4462  config_file_name)
4463  multicrab_config_lines.append("CMSSW.datasetpath = %s" % \
4464  dataset_name)
4465  multicrab_config_lines.append("CMSSW.runselection = %d" % \
4466  run)
4467 
4468  if self.Jsonlumi == True:
4469  pass
4470  else:
4471  if self.harvesting_type == "DQMOffline":
4472  pass
4473  else:
4474  multicrab_config_lines.append("CMSSW.total_number_of_events = %d" % \
4475  nevents)
4476  # The output file name.
4477  multicrab_config_lines.append("CMSSW.output_file = %s" % \
4478  output_file_name)
4479 
4480  ## CAF
4481  ##-----
4482  if site_name == "caf.cern.ch":
4483  multicrab_config_lines.append("CAF.queue=cmscaf1nd")
4484 
4485 
4486  # End of block.
4487  multicrab_config_lines.append("")
4488 
4489  loop = loop + 1
4490 
4491  self.all_sites_found = True
4492 
4493  multicrab_config = "\n".join(multicrab_config_lines)
4494 
4495  # End of create_multicrab_config.
4496  return multicrab_config
4497 
4498  ##########
4499 
4500  def check_globaltag(self, globaltag=None):
4501  """Check if globaltag exists.
4502 
4503  Check if globaltag exists as GlobalTag in the database given
4504  by self.frontier_connection_name['globaltag']. If globaltag is
4505  None, self.globaltag is used instead.
4506 
4507  If we're going to use reference histograms this method also
4508  checks for the existence of the required key in the GlobalTag.
4509 
4510  """
4511 
4512  if globaltag is None:
4513  globaltag = self.globaltag
4514 
4515  # All GlobalTags should end in `::All', right?
4516  if globaltag.endswith("::All"):
4517  globaltag = globaltag[:-5]
4518 
4519  connect_name = self.frontier_connection_name["globaltag"]
4520  # BUG BUG BUG
4521  # There is a bug in cmscond_tagtree_list: some magic is
4522  # missing from the implementation requiring one to specify
4523  # explicitly the name of the squid to connect to. Since the
4524  # cmsHarvester can only be run from the CERN network anyway,
4525  # cmsfrontier:8000 is hard-coded in here. Not nice but it
4526  # works.
4527  connect_name = connect_name.replace("frontier://",
4528  "frontier://cmsfrontier:8000/")
4529  # BUG BUG BUG end
4530  connect_name += self.db_account_name_cms_cond_globaltag()
4531 
4532  tag_exists = self.check_globaltag_exists(globaltag, connect_name)
4533 
4534  #----------
4535 
4536  tag_contains_ref_hist_key = False
4537  if self.use_ref_hists and tag_exists:
4538  # Check for the key required to use reference histograms.
4539  tag_contains_ref_hist_key = self.check_globaltag_contains_ref_hist_key(globaltag, connect_name)
4540 
4541  #----------
4542 
4543  if self.use_ref_hists:
4544  ret_val = tag_exists and tag_contains_ref_hist_key
4545  else:
4546  ret_val = tag_exists
4547 
4548  #----------
4549 
4550  # End of check_globaltag.
4551  return ret_val
4552 
4553  ##########
4554 
4555  def check_globaltag_exists(self, globaltag, connect_name):
4556  """Check if globaltag exists.
4557 
4558  """
4559 
4560  self.logger.info("Checking existence of GlobalTag `%s'" % \
4561  globaltag)
4562  self.logger.debug(" (Using database connection `%s')" % \
4563  connect_name)
4564 
4565  cmd = "cmscond_tagtree_list -c %s -T %s" % \
4566  (connect_name, globaltag)
4567  (status, output) = commands.getstatusoutput(cmd)
4568  if status != 0 or \
4569  output.find("error") > -1:
4570  msg = "Could not check existence of GlobalTag `%s' in `%s'" % \
4571  (globaltag, connect_name)
4572  if output.find(".ALL_TABLES not found") > -1:
4573  msg = "%s\n" \
4574  "Missing database account `%s'" % \
4575  (msg, output.split(".ALL_TABLES")[0].split()[-1])
4576  self.logger.fatal(msg)
4577  self.logger.debug("Command used:")
4578  self.logger.debug(" %s" % cmd)
4579  self.logger.debug("Output received:")
4580  self.logger.debug(output)
4581  raise Error(msg)
4582  if output.find("does not exist") > -1:
4583  self.logger.debug("GlobalTag `%s' does not exist in `%s':" % \
4584  (globaltag, connect_name))
4585  self.logger.debug("Output received:")
4586  self.logger.debug(output)
4587  tag_exists = False
4588  else:
4589  tag_exists = True
4590  self.logger.info(" GlobalTag exists? -> %s" % tag_exists)
4591 
4592  # End of check_globaltag_exists.
4593  return tag_exists
4594 
4595  ##########
4596 
4597  def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name):
4598  """Check if globaltag contains the required RefHistos key.
4599 
4600  """
4601 
4602  # Check for the key required to use reference histograms.
4603  tag_contains_key = None
4604  ref_hist_key = "RefHistos"
4605  self.logger.info("Checking existence of reference " \
4606  "histogram key `%s' in GlobalTag `%s'" % \
4607  (ref_hist_key, globaltag))
4608  self.logger.debug(" (Using database connection `%s')" % \
4609  connect_name)
4610  cmd = "cmscond_tagtree_list -c %s -T %s -n %s" % \
4611  (connect_name, globaltag, ref_hist_key)
4612  (status, output) = commands.getstatusoutput(cmd)
4613  if status != 0 or \
4614  output.find("error") > -1:
4615  msg = "Could not check existence of key `%s'" % \
4616  (ref_hist_key, connect_name)
4617  self.logger.fatal(msg)
4618  self.logger.debug("Command used:")
4619  self.logger.debug(" %s" % cmd)
4620  self.logger.debug("Output received:")
4621  self.logger.debug(" %s" % output)
4622  raise Error(msg)
4623  if len(output) < 1:
4624  self.logger.debug("Required key for use of reference " \
4625  "histograms `%s' does not exist " \
4626  "in GlobalTag `%s':" % \
4627  (ref_hist_key, globaltag))
4628  self.logger.debug("Output received:")
4629  self.logger.debug(output)
4630  tag_contains_key = False
4631  else:
4632  tag_contains_key = True
4633 
4634  self.logger.info(" GlobalTag contains `%s' key? -> %s" % \
4635  (ref_hist_key, tag_contains_key))
4636 
4637  # End of check_globaltag_contains_ref_hist_key.
4638  return tag_contains_key
4639 
4640  ##########
4641 
4642  def check_ref_hist_tag(self, tag_name):
4643  """Check the existence of tag_name in database connect_name.
4644 
4645  Check if tag_name exists as a reference histogram tag in the
4646  database given by self.frontier_connection_name['refhists'].
4647 
4648  """
4649 
4650  connect_name = self.frontier_connection_name["refhists"]
4651  connect_name += self.db_account_name_cms_cond_dqm_summary()
4652 
4653  self.logger.debug("Checking existence of reference " \
4654  "histogram tag `%s'" % \
4655  tag_name)
4656  self.logger.debug(" (Using database connection `%s')" % \
4657  connect_name)
4658 
4659  cmd = "cmscond_list_iov -c %s" % \
4660  connect_name
4661  (status, output) = commands.getstatusoutput(cmd)
4662  if status != 0:
4663  msg = "Could not check existence of tag `%s' in `%s'" % \
4664  (tag_name, connect_name)
4665  self.logger.fatal(msg)
4666  self.logger.debug("Command used:")
4667  self.logger.debug(" %s" % cmd)
4668  self.logger.debug("Output received:")
4669  self.logger.debug(output)
4670  raise Error(msg)
4671  if not tag_name in output.split():
4672  self.logger.debug("Reference histogram tag `%s' " \
4673  "does not exist in `%s'" % \
4674  (tag_name, connect_name))
4675  self.logger.debug(" Existing tags: `%s'" % \
4676  "', `".join(output.split()))
4677  tag_exists = False
4678  else:
4679  tag_exists = True
4680  self.logger.debug(" Reference histogram tag exists? " \
4681  "-> %s" % tag_exists)
4682 
4683  # End of check_ref_hist_tag.
4684  return tag_exists
4685 
4686  ##########
4687 
4688  def create_es_prefer_snippet(self, dataset_name):
4689  """Build the es_prefer snippet for the reference histograms.
4690 
4691  The building of the snippet is wrapped in some care-taking
4692  code that figures out the name of the reference histogram set
4693  and makes sure the corresponding tag exists.
4694 
4695  """
4696 
4697  # Figure out the name of the reference histograms tag.
4698  # NOTE: The existence of these tags has already been checked.
4699  ref_hist_tag_name = self.ref_hist_mappings[dataset_name]
4700 
4701  connect_name = self.frontier_connection_name["refhists"]
4702  connect_name += self.db_account_name_cms_cond_dqm_summary()
4703  record_name = "DQMReferenceHistogramRootFileRcd"
4704 
4705  # Build up the code snippet.
4706  code_lines = []
4707  code_lines.append("from CondCore.DBCommon.CondDBSetup_cfi import *")
4708  code_lines.append("process.ref_hist_source = cms.ESSource(\"PoolDBESSource\", CondDBSetup,")
4709  code_lines.append(" connect = cms.string(\"%s\")," % connect_name)
4710  code_lines.append(" toGet = cms.VPSet(cms.PSet(record = cms.string(\"%s\")," % record_name)
4711  code_lines.append(" tag = cms.string(\"%s\"))," % ref_hist_tag_name)
4712  code_lines.append(" )")
4713  code_lines.append(" )")
4714  code_lines.append("process.es_prefer_ref_hist_source = cms.ESPrefer(\"PoolDBESSource\", \"ref_hist_source\")")
4715 
4716  snippet = "\n".join(code_lines)
4717 
4718  # End of create_es_prefer_snippet.
4719  return snippet
4720 
4721  ##########
4722 
4723  def create_harvesting_config(self, dataset_name):
4724  """Create the Python harvesting configuration for harvesting.
4725 
4726  The basic configuration is created by
4727  Configuration.PyReleaseValidation.ConfigBuilder. (This mimics
4728  what cmsDriver.py does.) After that we add some specials
4729  ourselves.
4730 
4731  NOTE: On one hand it may not be nice to circumvent
4732  cmsDriver.py, on the other hand cmsDriver.py does not really
4733  do anything itself. All the real work is done by the
4734  ConfigBuilder so there is not much risk that we miss out on
4735  essential developments of cmsDriver in the future.
4736 
4737  """
4738 
4739  # Setup some options needed by the ConfigBuilder.
4740  config_options = defaultOptions
4741 
4742  # These are fixed for all kinds of harvesting jobs. Some of
4743  # them are not needed for the harvesting config, but to keep
4744  # the ConfigBuilder happy.
4745  config_options.name = "harvesting"
4746  config_options.scenario = "pp"
4747  config_options.number = 1
4748  config_options.arguments = self.ident_string()
4749  config_options.evt_type = config_options.name
4750  config_options.customisation_file = None
4751  config_options.filein = "dummy_value"
4752  config_options.filetype = "EDM"
4753  # This seems to be new in CMSSW 3.3.X, no clue what it does.
4754  config_options.gflash = "dummy_value"
4755  # This seems to be new in CMSSW 3.3.0.pre6, no clue what it
4756  # does.
4757  #config_options.himix = "dummy_value"
4758  config_options.dbsquery = ""
4759 
4760  ###
4761 
4762  # These options depend on the type of harvesting we're doing
4763  # and are stored in self.harvesting_info.
4764 
4765  config_options.step = "HARVESTING:%s" % \
4766  self.harvesting_info[self.harvesting_type] \
4767  ["step_string"]
4768  config_options.beamspot = self.harvesting_info[self.harvesting_type] \
4769  ["beamspot"]
4770  config_options.eventcontent = self.harvesting_info \
4771  [self.harvesting_type] \
4772  ["eventcontent"]
4773  config_options.harvesting = self.harvesting_info \
4774  [self.harvesting_type] \
4775  ["harvesting"]
4776 
4777  ###
4778 
4779  # This one is required (see also above) for each dataset.
4780 
4781  datatype = self.datasets_information[dataset_name]["datatype"]
4782  config_options.isMC = (datatype.lower() == "mc")
4783  config_options.isData = (datatype.lower() == "data")
4784  globaltag = self.datasets_information[dataset_name]["globaltag"]
4785 
4786  config_options.conditions = self.format_conditions_string(globaltag)
4787 
4788  ###
4789 
4790  if "with_input" in getargspec(ConfigBuilder.__init__)[0]:
4791  # This is the case for 3.3.X.
4792  config_builder = ConfigBuilder(config_options, with_input=True)
4793  else:
4794  # This is the case in older CMSSW versions.
4795  config_builder = ConfigBuilder(config_options)
4796  config_builder.prepare(True)
4797  config_contents = config_builder.pythonCfgCode
4798 
4799  ###
4800 
4801  # Add our signature to the top of the configuration. and add
4802  # some markers to the head and the tail of the Python code
4803  # generated by the ConfigBuilder.
4804  marker_lines = []
4805  sep = "#" * 30
4806  marker_lines.append(sep)
4807  marker_lines.append("# Code between these markers was generated by")
4808  marker_lines.append("# Configuration.PyReleaseValidation." \
4809  "ConfigBuilder")
4810 
4811  marker_lines.append(sep)
4812  marker = "\n".join(marker_lines)
4813 
4814  tmp = [self.config_file_header()]
4815  tmp.append("")
4816  tmp.append(marker)
4817  tmp.append("")
4818  tmp.append(config_contents)
4819  tmp.append("")
4820  tmp.append(marker)
4821  tmp.append("")
4822  config_contents = "\n".join(tmp)
4823 
4824  ###
4825 
4826  # Now we add some stuff of our own.
4827  customisations = [""]
4828 
4829  customisations.append("# Now follow some customisations")
4830  customisations.append("")
4831  connect_name = self.frontier_connection_name["globaltag"]
4832  connect_name += self.db_account_name_cms_cond_globaltag()
4833  customisations.append("process.GlobalTag.connect = \"%s\"" % \
4834  connect_name)
4835 
4836 
4837  if self.saveByLumiSection == True:
4838  customisations.append("process.dqmSaver.saveByLumiSection = 1")
4839 
4841 
4842  customisations.append("")
4843 
4844  # About the reference histograms... For data there is only one
4845  # set of references and those are picked up automatically
4846  # based on the GlobalTag. For MC we have to do some more work
4847  # since the reference histograms to be used depend on the MC
4848  # sample at hand. In this case we glue in an es_prefer snippet
4849  # to pick up the references. We do this only for RelVals since
4850  # for MC there are no meaningful references so far.
4851 
4852  # NOTE: Due to the lack of meaningful references for
4853  # MC samples reference histograms are explicitly
4854  # switched off in this case.
4855 
4856  use_es_prefer = (self.harvesting_type == "RelVal")
4857  use_refs = use_es_prefer or \
4858  (not self.harvesting_type == "MC")
4859  # Allow global override.
4860  use_refs = use_refs and self.use_ref_hists
4861 
4862  if not use_refs:
4863  # Disable reference histograms explicitly. The histograms
4864  # are loaded by the dqmRefHistoRootFileGetter
4865  # EDAnalyzer. This analyzer can be run from several
4866  # sequences. Here we remove it from each sequence that
4867  # exists.
4868  customisations.append("print \"Not using reference histograms\"")
4869  customisations.append("if hasattr(process, \"dqmRefHistoRootFileGetter\"):")
4870  customisations.append(" for (sequence_name, sequence) in six.iteritems(process.sequences):")
4871  customisations.append(" if sequence.remove(process.dqmRefHistoRootFileGetter):")
4872  customisations.append(" print \"Removed process.dqmRefHistoRootFileGetter from sequence `%s'\" % \\")
4873  customisations.append(" sequence_name")
4874  customisations.append("process.dqmSaver.referenceHandling = \"skip\"")
4875  else:
4876  # This makes sure all reference histograms are saved to
4877  # the output ROOT file.
4878  customisations.append("process.dqmSaver.referenceHandling = \"all\"")
4879  if use_es_prefer:
4880  es_prefer_snippet = self.create_es_prefer_snippet(dataset_name)
4881  customisations.append(es_prefer_snippet)
4882 
4883  # Make sure we get the `workflow' correct. As far as I can see
4884  # this is only important for the output file name.
4885  workflow_name = dataset_name
4886  if self.harvesting_mode == "single-step-allow-partial":
4887  workflow_name += "_partial"
4888  customisations.append("process.dqmSaver.workflow = \"%s\"" % \
4889  workflow_name)
4890 
4891  # BUG BUG BUG
4892  # This still does not work. The current two-step harvesting
4893  # efforts are on hold waiting for the solution to come from
4894  # elsewhere. (In this case the elsewhere is Daniele Spiga.)
4895 
4896 ## # In case this file is the second step (the real harvesting
4897 ## # step) of the two-step harvesting we have to tell it to use
4898 ## # our local files.
4899 ## if self.harvesting_mode == "two-step":
4900 ## castor_dir = self.datasets_information[dataset_name] \
4901 ## ["castor_path"][run]
4902 ## customisations.append("")
4903 ## customisations.append("# This is the second step (the real")
4904 ## customisations.append("# harvesting step) of a two-step")
4905 ## customisations.append("# harvesting procedure.")
4906 ## # BUG BUG BUG
4907 ## # To be removed in production version.
4908 ## customisations.append("import pdb")
4909 ## # BUG BUG BUG end
4910 ## customisations.append("import commands")
4911 ## customisations.append("import os")
4912 ## customisations.append("castor_dir = \"%s\"" % castor_dir)
4913 ## customisations.append("cmd = \"rfdir %s\" % castor_dir")
4914 ## customisations.append("(status, output) = commands.getstatusoutput(cmd)")
4915 ## customisations.append("if status != 0:")
4916 ## customisations.append(" print \"ERROR\"")
4917 ## customisations.append(" raise Exception, \"ERROR\"")
4918 ## customisations.append("file_names = [os.path.join(\"rfio:%s\" % path, i) for i in output.split() if i.startswith(\"EDM_summary\") and i.endswith(\".root\")]")
4919 ## #customisations.append("pdb.set_trace()")
4920 ## customisations.append("process.source.fileNames = cms.untracked.vstring(*file_names)")
4921 ## customisations.append("")
4922 
4923  # BUG BUG BUG end
4924 
4925  config_contents = config_contents + "\n".join(customisations)
4926 
4927  ###
4928 
4929  # End of create_harvesting_config.
4930  return config_contents
4931 
4932 ## ##########
4933 
4934 ## def create_harvesting_config_two_step(self, dataset_name):
4935 ## """Create the Python harvesting configuration for two-step
4936 ## harvesting.
4937 
4938 ## """
4939 
4940 ## # BUG BUG BUG
4941 ## config_contents = self.create_harvesting_config_single_step(dataset_name)
4942 ## # BUG BUG BUG end
4943 
4944 ## # End of create_harvesting_config_two_step.
4945 ## return config_contents
4946 
4947  ##########
4948 
4949  def create_me_extraction_config(self, dataset_name):
4950  """
4951 
4952  """
4953 
4954  # Big chunk of hard-coded Python. Not such a big deal since
4955  # this does not do much and is not likely to break.
4956  tmp = []
4957  tmp.append(self.config_file_header())
4958  tmp.append("")
4959  tmp.append("import FWCore.ParameterSet.Config as cms")
4960  tmp.append("")
4961  tmp.append("process = cms.Process(\"ME2EDM\")")
4962  tmp.append("")
4963  tmp.append("# Import of standard configurations")
4964  tmp.append("process.load(\"Configuration/EventContent/EventContent_cff\")")
4965  tmp.append("")
4966  tmp.append("# We don't really process any events, just keep this set to one to")
4967  tmp.append("# make sure things work.")
4968  tmp.append("process.maxEvents = cms.untracked.PSet(")
4969  tmp.append(" input = cms.untracked.int32(1)")
4970  tmp.append(" )")
4971  tmp.append("")
4972  tmp.append("process.options = cms.untracked.PSet(")
4973  tmp.append(" Rethrow = cms.untracked.vstring(\"ProductNotFound\")")
4974  tmp.append(" )")
4975  tmp.append("")
4976  tmp.append("process.source = cms.Source(\"PoolSource\",")
4977  tmp.append(" processingMode = \\")
4978  tmp.append(" cms.untracked.string(\"RunsAndLumis\"),")
4979  tmp.append(" fileNames = \\")
4980  tmp.append(" cms.untracked.vstring(\"no_file_specified\")")
4981  tmp.append(" )")
4982  tmp.append("")
4983  tmp.append("# Output definition: drop everything except for the monitoring.")
4984  tmp.append("process.output = cms.OutputModule(")
4985  tmp.append(" \"PoolOutputModule\",")
4986  tmp.append(" outputCommands = \\")
4987  tmp.append(" cms.untracked.vstring(\"drop *\", \\")
4988  tmp.append(" \"keep *_MEtoEDMConverter_*_*\"),")
4989  output_file_name = self. \
4990  create_output_file_name(dataset_name)
4991  tmp.append(" fileName = \\")
4992  tmp.append(" cms.untracked.string(\"%s\")," % output_file_name)
4993  tmp.append(" dataset = cms.untracked.PSet(")
4994  tmp.append(" dataTier = cms.untracked.string(\"RECO\"),")
4995  tmp.append(" filterName = cms.untracked.string(\"\")")
4996  tmp.append(" )")
4997  tmp.append(" )")
4998  tmp.append("")
4999  tmp.append("# Additional output definition")
5000  tmp.append("process.out_step = cms.EndPath(process.output)")
5001  tmp.append("")
5002  tmp.append("# Schedule definition")
5003  tmp.append("process.schedule = cms.Schedule(process.out_step)")
5004  tmp.append("")
5005 
5006  config_contents = "\n".join(tmp)
5007 
5008  # End of create_me_extraction_config.
5009  return config_contents
5010 
5011  ##########
5012 
5013 ## def create_harvesting_config(self, dataset_name):
5014 ## """Create the Python harvesting configuration for a given job.
5015 
5016 ## NOTE: The reason to have a single harvesting configuration per
5017 ## sample is to be able to specify the GlobalTag corresponding to
5018 ## each sample. Since it has been decided that (apart from the
5019 ## prompt reco) datasets cannot contain runs with different
5020 ## GlobalTags, we don't need a harvesting config per run.
5021 
5022 ## NOTE: This is the place where we distinguish between
5023 ## single-step and two-step harvesting modes (at least for the
5024 ## Python job configuration).
5025 
5026 ## """
5027 
5028 ## ###
5029 
5030 ## if self.harvesting_mode == "single-step":
5031 ## config_contents = self.create_harvesting_config_single_step(dataset_name)
5032 ## elif self.harvesting_mode == "two-step":
5033 ## config_contents = self.create_harvesting_config_two_step(dataset_name)
5034 ## else:
5035 ## # Impossible harvesting mode, we should never get here.
5036 ## assert False, "ERROR: unknown harvesting mode `%s'" % \
5037 ## self.harvesting_mode
5038 
5039 ## ###
5040 
5041 ## # End of create_harvesting_config.
5042 ## return config_contents
5043 
5044  ##########
5045 
5047  """Write a CRAB job configuration Python file.
5048 
5049  """
5050 
5051  self.logger.info("Writing CRAB configuration...")
5052 
5053  file_name_base = "crab.cfg"
5054 
5055  # Create CRAB configuration.
5056  crab_contents = self.create_crab_config()
5057 
5058  # Write configuration to file.
5059  crab_file_name = file_name_base
5060  try:
5061  crab_file = file(crab_file_name, "w")
5062  crab_file.write(crab_contents)
5063  crab_file.close()
5064  except IOError:
5065  self.logger.fatal("Could not write " \
5066  "CRAB configuration to file `%s'" % \
5067  crab_file_name)
5068  raise Error("ERROR: Could not write to file `%s'!" % \
5069  crab_file_name)
5070 
5071  # End of write_crab_config.
5072 
5073  ##########
5074 
5076  """Write a multi-CRAB job configuration Python file.
5077 
5078  """
5079 
5080  self.logger.info("Writing multi-CRAB configuration...")
5081 
5082  file_name_base = "multicrab.cfg"
5083 
5084  # Create multi-CRAB configuration.
5085  multicrab_contents = self.create_multicrab_config()
5086 
5087  # Write configuration to file.
5088  multicrab_file_name = file_name_base
5089  try:
5090  multicrab_file = file(multicrab_file_name, "w")
5091  multicrab_file.write(multicrab_contents)
5092  multicrab_file.close()
5093  except IOError:
5094  self.logger.fatal("Could not write " \
5095  "multi-CRAB configuration to file `%s'" % \
5096  multicrab_file_name)
5097  raise Error("ERROR: Could not write to file `%s'!" % \
5098  multicrab_file_name)
5099 
5100  # End of write_multicrab_config.
5101 
5102  ##########
5103 
5104  def write_harvesting_config(self, dataset_name):
5105  """Write a harvesting job configuration Python file.
5106 
5107  NOTE: This knows nothing about single-step or two-step
5108  harvesting. That's all taken care of by
5109  create_harvesting_config.
5110 
5111  """
5112 
5113  self.logger.debug("Writing harvesting configuration for `%s'..." % \
5114  dataset_name)
5115 
5116  # Create Python configuration.
5117  config_contents = self.create_harvesting_config(dataset_name)
5118 
5119  # Write configuration to file.
5120  config_file_name = self. \
5122  try:
5123  config_file = file(config_file_name, "w")
5124  config_file.write(config_contents)
5125  config_file.close()
5126  except IOError:
5127  self.logger.fatal("Could not write " \
5128  "harvesting configuration to file `%s'" % \
5129  config_file_name)
5130  raise Error("ERROR: Could not write to file `%s'!" % \
5131  config_file_name)
5132 
5133  # End of write_harvesting_config.
5134 
5135  ##########
5136 
5137  def write_me_extraction_config(self, dataset_name):
5138  """Write an ME-extraction configuration Python file.
5139 
5140  This `ME-extraction' (ME = Monitoring Element) is the first
5141  step of the two-step harvesting.
5142 
5143  """
5144 
5145  self.logger.debug("Writing ME-extraction configuration for `%s'..." % \
5146  dataset_name)
5147 
5148  # Create Python configuration.
5149  config_contents = self.create_me_extraction_config(dataset_name)
5150 
5151  # Write configuration to file.
5152  config_file_name = self. \
5154  try:
5155  config_file = file(config_file_name, "w")
5156  config_file.write(config_contents)
5157  config_file.close()
5158  except IOError:
5159  self.logger.fatal("Could not write " \
5160  "ME-extraction configuration to file `%s'" % \
5161  config_file_name)
5162  raise Error("ERROR: Could not write to file `%s'!" % \
5163  config_file_name)
5164 
5165  # End of write_me_extraction_config.
5166 
5167  ##########
5168 
5169 
5170  def ref_hist_mappings_needed(self, dataset_name=None):
5171  """Check if we need to load and check the reference mappings.
5172 
5173  For data the reference histograms should be taken
5174  automatically from the GlobalTag, so we don't need any
5175  mappings. For RelVals we need to know a mapping to be used in
5176  the es_prefer code snippet (different references for each of
5177  the datasets.)
5178 
5179  WARNING: This implementation is a bit convoluted.
5180 
5181  """
5182 
5183  # If no dataset name given, do everything, otherwise check
5184  # only this one dataset.
5185  if not dataset_name is None:
5186  data_type = self.datasets_information[dataset_name] \
5187  ["datatype"]
5188  mappings_needed = (data_type == "mc")
5189  # DEBUG DEBUG DEBUG
5190  if not mappings_needed:
5191  assert data_type == "data"
5192  # DEBUG DEBUG DEBUG end
5193  else:
5194  tmp = [self.ref_hist_mappings_needed(dataset_name) \
5195  for dataset_name in \
5196  self.datasets_information.keys()]
5197  mappings_needed = (True in tmp)
5198 
5199  # End of ref_hist_mappings_needed.
5200  return mappings_needed
5201 
5202  ##########
5203 
5205  """Load the reference histogram mappings from file.
5206 
5207  The dataset name to reference histogram name mappings are read
5208  from a text file specified in self.ref_hist_mappings_file_name.
5209 
5210  """
5211 
5212  # DEBUG DEBUG DEBUG
5213  assert len(self.ref_hist_mappings) < 1, \
5214  "ERROR Should not be RE-loading " \
5215  "reference histogram mappings!"
5216  # DEBUG DEBUG DEBUG end
5217 
5218  self.logger.info("Loading reference histogram mappings " \
5219  "from file `%s'" % \
5220  self.ref_hist_mappings_file_name)
5221 
5222  mappings_lines = None
5223  try:
5224  mappings_file = file(self.ref_hist_mappings_file_name, "r")
5225  mappings_lines = mappings_file.readlines()
5226  mappings_file.close()
5227  except IOError:
5228  msg = "ERROR: Could not open reference histogram mapping "\
5229  "file `%s'" % self.ref_hist_mappings_file_name
5230  self.logger.fatal(msg)
5231  raise Error(msg)
5232 
5233  ##########
5234 
5235  # The format we expect is: two white-space separated pieces
5236  # per line. The first the dataset name for which the reference
5237  # should be used, the second one the name of the reference
5238  # histogram in the database.
5239 
5240  for mapping in mappings_lines:
5241  # Skip comment lines.
5242  if not mapping.startswith("#"):
5243  mapping = mapping.strip()
5244  if len(mapping) > 0:
5245  mapping_pieces = mapping.split()
5246  if len(mapping_pieces) != 2:
5247  msg = "ERROR: The reference histogram mapping " \
5248  "file contains a line I don't " \
5249  "understand:\n %s" % mapping
5250  self.logger.fatal(msg)
5251  raise Error(msg)
5252  dataset_name = mapping_pieces[0].strip()
5253  ref_hist_name = mapping_pieces[1].strip()
5254  # We don't want people to accidentally specify
5255  # multiple mappings for the same dataset. Just
5256  # don't accept those cases.
5257  if dataset_name in self.ref_hist_mappings:
5258  msg = "ERROR: The reference histogram mapping " \
5259  "file contains multiple mappings for " \
5260  "dataset `%s'."
5261  self.logger.fatal(msg)
5262  raise Error(msg)
5263 
5264  # All is well that ends well.
5265  self.ref_hist_mappings[dataset_name] = ref_hist_name
5266 
5267  ##########
5268 
5269  self.logger.info(" Successfully loaded %d mapping(s)" % \
5270  len(self.ref_hist_mappings))
5271  max_len = max([len(i) for i in self.ref_hist_mappings.keys()])
5272  for (map_from, map_to) in six.iteritems(self.ref_hist_mappings):
5273  self.logger.info(" %-*s -> %s" % \
5274  (max_len, map_from, map_to))
5275 
5276  # End of load_ref_hist_mappings.
5277 
5278  ##########
5279 
5281  """Make sure all necessary reference histograms exist.
5282 
5283  Check that for each of the datasets to be processed a
5284  reference histogram is specified and that that histogram
5285  exists in the database.
5286 
5287  NOTE: There's a little complication here. Since this whole
5288  thing was designed to allow (in principle) harvesting of both
5289  data and MC datasets in one go, we need to be careful to check
5290  the availability fof reference mappings only for those
5291  datasets that need it.
5292 
5293  """
5294 
5295  self.logger.info("Checking reference histogram mappings")
5296 
5297  for dataset_name in self.datasets_to_use:
5298  try:
5299  ref_hist_name = self.ref_hist_mappings[dataset_name]
5300  except KeyError:
5301  msg = "ERROR: No reference histogram mapping found " \
5302  "for dataset `%s'" % \
5303  dataset_name
5304  self.logger.fatal(msg)
5305  raise Error(msg)
5306 
5307  if not self.check_ref_hist_tag(ref_hist_name):
5308  msg = "Reference histogram tag `%s' " \
5309  "(used for dataset `%s') does not exist!" % \
5310  (ref_hist_name, dataset_name)
5311  self.logger.fatal(msg)
5312  raise Usage(msg)
5313 
5314  self.logger.info(" Done checking reference histogram mappings.")
5315 
5316  # End of check_ref_hist_mappings.
5317 
5318  ##########
5319 
5321  """Obtain all information on the datasets that we need to run.
5322 
5323  Use DBS to figure out all required information on our
5324  datasets, like the run numbers and the GlobalTag. All
5325  information is stored in the datasets_information member
5326  variable.
5327 
5328  """
5329 
5330  # Get a list of runs in the dataset.
5331  # NOTE: The harvesting has to be done run-by-run, so we
5332  # split up datasets based on the run numbers. Strictly
5333  # speaking this is not (yet?) necessary for Monte Carlo
5334  # since all those samples use run number 1. Still, this
5335  # general approach should work for all samples.
5336 
5337  # Now loop over all datasets in the list and process them.
5338  # NOTE: This processing has been split into several loops
5339  # to be easier to follow, sacrificing a bit of efficiency.
5340  self.datasets_information = {}
5341  self.logger.info("Collecting information for all datasets to process")
5342  dataset_names = sorted(self.datasets_to_use.keys())
5343  for dataset_name in dataset_names:
5344 
5345  # Tell the user which dataset: nice with many datasets.
5346  sep_line = "-" * 30
5347  self.logger.info(sep_line)
5348  self.logger.info(" `%s'" % dataset_name)
5349  self.logger.info(sep_line)
5350 
5351  runs = self.dbs_resolve_runs(dataset_name)
5352  self.logger.info(" found %d run(s)" % len(runs))
5353  if len(runs) > 0:
5354  self.logger.debug(" run number(s): %s" % \
5355  ", ".join([str(i) for i in runs]))
5356  else:
5357  # DEBUG DEBUG DEBUG
5358  # This should never happen after the DBS checks.
5359  self.logger.warning(" --> skipping dataset "
5360  "without any runs")
5361  assert False, "Panic: found a dataset without runs " \
5362  "after DBS checks!"
5363  # DEBUG DEBUG DEBUG end
5364 
5365  cmssw_version = self.dbs_resolve_cmssw_version(dataset_name)
5366  self.logger.info(" found CMSSW version `%s'" % cmssw_version)
5367 
5368  # Figure out if this is data or MC.
5369  datatype = self.dbs_resolve_datatype(dataset_name)
5370  self.logger.info(" sample is data or MC? --> %s" % \
5371  datatype)
5372 
5373  ###
5374 
5375  # Try and figure out the GlobalTag to be used.
5376  if self.globaltag is None:
5377  globaltag = self.dbs_resolve_globaltag(dataset_name)
5378  else:
5379  globaltag = self.globaltag
5380 
5381  self.logger.info(" found GlobalTag `%s'" % globaltag)
5382 
5383  # DEBUG DEBUG DEBUG
5384  if globaltag == "":
5385  # Actually we should not even reach this point, after
5386  # our dataset sanity checks.
5387  assert datatype == "data", \
5388  "ERROR Empty GlobalTag for MC dataset!!!"
5389  # DEBUG DEBUG DEBUG end
5390 
5391  ###
5392 
5393  # DEBUG DEBUG DEBUG
5394  #tmp = self.dbs_check_dataset_spread_old(dataset_name)
5395  # DEBUG DEBUG DEBUG end
5396  sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5397 
5398  # Extract the total event counts.
5399  num_events = {}
5400  for run_number in sites_catalog.keys():
5401  num_events[run_number] = sites_catalog \
5402  [run_number]["all_sites"]
5403  del sites_catalog[run_number]["all_sites"]
5404 
5405  # Extract the information about whether or not datasets
5406  # are mirrored.
5407  mirror_catalog = {}
5408  for run_number in sites_catalog.keys():
5409  mirror_catalog[run_number] = sites_catalog \
5410  [run_number]["mirrored"]
5411  del sites_catalog[run_number]["mirrored"]
5412 
5413  # BUG BUG BUG
5414  # I think I could now get rid of that and just fill the
5415  # "sites" entry with the `inverse' of this
5416  # num_events_catalog(?).
5417  #num_sites = self.dbs_resolve_dataset_number_of_sites(dataset_name)
5418  #sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5419  #sites_catalog = dict(zip(num_events_catalog.keys(),
5420  # [[j for i in num_events_catalog.values() for j in i.keys()]]))
5421  # BUG BUG BUG end
5422 
5423 ## # DEBUG DEBUG DEBUG
5424 ## # This is probably only useful to make sure we don't muck
5425 ## # things up, right?
5426 ## # Figure out across how many sites this sample has been spread.
5427 ## if num_sites == 1:
5428 ## self.logger.info(" sample is contained at a single site")
5429 ## else:
5430 ## self.logger.info(" sample is spread across %d sites" % \
5431 ## num_sites)
5432 ## if num_sites < 1:
5433 ## # NOTE: This _should not_ happen with any valid dataset.
5434 ## self.logger.warning(" --> skipping dataset which is not " \
5435 ## "hosted anywhere")
5436 ## # DEBUG DEBUG DEBUG end
5437 
5438  # Now put everything in a place where we can find it again
5439  # if we need it.
5440  self.datasets_information[dataset_name] = {}
5441  self.datasets_information[dataset_name]["runs"] = runs
5442  self.datasets_information[dataset_name]["cmssw_version"] = \
5443  cmssw_version
5444  self.datasets_information[dataset_name]["globaltag"] = globaltag
5445  self.datasets_information[dataset_name]["datatype"] = datatype
5446  self.datasets_information[dataset_name]["num_events"] = num_events
5447  self.datasets_information[dataset_name]["mirrored"] = mirror_catalog
5448  self.datasets_information[dataset_name]["sites"] = sites_catalog
5449 
5450  # Each run of each dataset has a different CASTOR output
5451  # path.
5452  castor_path_common = self.create_castor_path_name_common(dataset_name)
5453  self.logger.info(" output will go into `%s'" % \
5454  castor_path_common)
5455 
5456  castor_paths = dict(list(zip(runs,
5457  [self.create_castor_path_name_special(dataset_name, i, castor_path_common) \
5458  for i in runs])))
5459  for path_name in castor_paths.values():
5460  self.logger.debug(" %s" % path_name)
5461  self.datasets_information[dataset_name]["castor_path"] = \
5462  castor_paths
5463 
5464  # End of build_datasets_information.
5465 
5466  ##########
5467 
5469  """Tell the user what to do now, after this part is done.
5470 
5471  This should provide the user with some (preferably
5472  copy-pasteable) instructions on what to do now with the setups
5473  and files that have been created.
5474 
5475  """
5476 
5477  # TODO TODO TODO
5478  # This could be improved a bit.
5479  # TODO TODO TODO end
5480 
5481  sep_line = "-" * 60
5482 
5483  self.logger.info("")
5484  self.logger.info(sep_line)
5485  self.logger.info(" Configuration files have been created.")
5486  self.logger.info(" From here on please follow the usual CRAB instructions.")
5487  self.logger.info(" Quick copy-paste instructions are shown below.")
5488  self.logger.info(sep_line)
5489 
5490  self.logger.info("")
5491  self.logger.info(" Create all CRAB jobs:")
5492  self.logger.info(" multicrab -create")
5493  self.logger.info("")
5494  self.logger.info(" Submit all CRAB jobs:")
5495  self.logger.info(" multicrab -submit")
5496  self.logger.info("")
5497  self.logger.info(" Check CRAB status:")
5498  self.logger.info(" multicrab -status")
5499  self.logger.info("")
5500 
5501  self.logger.info("")
5502  self.logger.info(" For more information please see the CMS Twiki:")
5503  self.logger.info(" %s" % twiki_url)
5504  self.logger.info(sep_line)
5505 
5506  # If there were any jobs for which we could not find a
5507  # matching site show a warning message about that.
5508  if not self.all_sites_found:
5509  self.logger.warning(" For some of the jobs no matching " \
5510  "site could be found")
5511  self.logger.warning(" --> please scan your multicrab.cfg" \
5512  "for occurrences of `%s'." % \
5513  self.no_matching_site_found_str)
5514  self.logger.warning(" You will have to fix those " \
5515  "by hand, sorry.")
5516 
5517  # End of show_exit_message.
5518 
5519  ##########
5520 
5521  def run(self):
5522  "Main entry point of the CMS harvester."
5523 
5524  # Start with a positive thought.
5525  exit_code = 0
5526 
5527  try:
5528 
5529  try:
5530 
5531  # Parse all command line options and arguments
5532  self.parse_cmd_line_options()
5533  # and check that they make sense.
5534  self.check_input_status()
5535 
5536  # Check if CMSSW is setup.
5537  self.check_cmssw()
5538 
5539  # Check if DBS is setup,
5540  self.check_dbs()
5541  # and if all is fine setup the Python side.
5542  self.setup_dbs()
5543 
5544  # Fill our dictionary with all the required info we
5545  # need to understand harvesting jobs. This needs to be
5546  # done after the CMSSW version is known.
5547  self.setup_harvesting_info()
5548 
5549  # Obtain list of dataset names to consider
5550  self.build_dataset_use_list()
5551  # and the list of dataset names to ignore.
5552  self.build_dataset_ignore_list()
5553 
5554  # The same for the runs lists (if specified).
5555  self.build_runs_use_list()
5556  self.build_runs_ignore_list()
5557 
5558  # Process the list of datasets to ignore and fold that
5559  # into the list of datasets to consider.
5560  # NOTE: The run-based selection is done later since
5561  # right now we don't know yet which runs a dataset
5562  # contains.
5563  self.process_dataset_ignore_list()
5564 
5565  # Obtain all required information on the datasets,
5566  # like run numbers and GlobalTags.
5567  self.build_datasets_information()
5568 
5569  if self.use_ref_hists and \
5570  self.ref_hist_mappings_needed():
5571  # Load the dataset name to reference histogram
5572  # name mappings from file.
5573  self.load_ref_hist_mappings()
5574  # Now make sure that for all datasets we want to
5575  # process there is a reference defined. Otherwise
5576  # just bomb out before wasting any more time.
5577  self.check_ref_hist_mappings()
5578  else:
5579  self.logger.info("No need to load reference " \
5580  "histogram mappings file")
5581 
5582  # OBSOLETE OBSOLETE OBSOLETE
5583 ## # TODO TODO TODO
5584 ## # Need to think about where this should go, but
5585 ## # somewhere we have to move over the fact that we want
5586 ## # to process all runs for each dataset that we're
5587 ## # considering. This basically means copying over the
5588 ## # information from self.datasets_information[]["runs"]
5589 ## # to self.datasets_to_use[].
5590 ## for dataset_name in self.datasets_to_use.keys():
5591 ## self.datasets_to_use[dataset_name] = self.datasets_information[dataset_name]["runs"]
5592 ## # TODO TODO TODO end
5593  # OBSOLETE OBSOLETE OBSOLETE end
5594 
5595  self.process_runs_use_and_ignore_lists()
5596 
5597  # If we've been asked to sacrifice some parts of
5598  # spread-out samples in order to be able to partially
5599  # harvest them, we'll do that here.
5600  if self.harvesting_mode == "single-step-allow-partial":
5601  self.singlify_datasets()
5602 
5603  # Check dataset name(s)
5604  self.check_dataset_list()
5605  # and see if there is anything left to do.
5606  if len(self.datasets_to_use) < 1:
5607  self.logger.info("After all checks etc. " \
5608  "there are no datasets (left?) " \
5609  "to process")
5610  else:
5611 
5612  self.logger.info("After all checks etc. we are left " \
5613  "with %d dataset(s) to process " \
5614  "for a total of %d runs" % \
5615  (len(self.datasets_to_use),
5616  sum([len(i) for i in \
5617  self.datasets_to_use.values()])))
5618 
5619  # NOTE: The order in which things are done here is
5620  # important. At the end of the job, independent on
5621  # how it ends (exception, CTRL-C, normal end) the
5622  # book keeping is written to file. At that time it
5623  # should be clear which jobs are done and can be
5624  # submitted. This means we first create the
5625  # general files, and then the per-job config
5626  # files.
5627 
5628  # TODO TODO TODO
5629  # It would be good to modify the book keeping a
5630  # bit. Now we write the crab.cfg (which is the
5631  # same for all samples and runs) and the
5632  # multicrab.cfg (which contains blocks for all
5633  # runs of all samples) without updating our book
5634  # keeping. The only place we update the book
5635  # keeping is after writing the harvesting config
5636  # file for a given dataset. Since there is only
5637  # one single harvesting configuration for each
5638  # dataset, we have no book keeping information on
5639  # a per-run basis.
5640  # TODO TODO TODO end
5641 
5642  # Check if the CASTOR output area exists. If
5643  # necessary create it.
5644  self.create_and_check_castor_dirs()
5645 
5646  # Create one crab and one multicrab configuration
5647  # for all jobs together.
5648  self.write_crab_config()
5649  self.write_multicrab_config()
5650 
5651  # Loop over all datasets and create harvesting
5652  # config files for all of them. One harvesting
5653  # config per dataset is enough. The same file will
5654  # be re-used by CRAB for each run.
5655  # NOTE: We always need a harvesting
5656  # configuration. For the two-step harvesting we
5657  # also need a configuration file for the first
5658  # step: the monitoring element extraction.
5659  for dataset_name in self.datasets_to_use.keys():
5660  try:
5661  self.write_harvesting_config(dataset_name)
5662  if self.harvesting_mode == "two-step":
5663  self.write_me_extraction_config(dataset_name)
5664  except:
5665  # Doh! Just re-raise the damn thing.
5666  raise
5667  else:
5668 ## tmp = self.datasets_information[dataset_name] \
5669 ## ["num_events"]
5670  tmp = {}
5671  for run_number in self.datasets_to_use[dataset_name]:
5672  tmp[run_number] = self.datasets_information \
5673  [dataset_name]["num_events"][run_number]
5674  if dataset_name in self.book_keeping_information:
5675  self.book_keeping_information[dataset_name].update(tmp)
5676  else:
5677  self.book_keeping_information[dataset_name] = tmp
5678 
5679  # Explain the user what to do now.
5680  self.show_exit_message()
5681 
5682  except Usage as err:
5683  # self.logger.fatal(err.msg)
5684  # self.option_parser.print_help()
5685  pass
5686 
5687  except Error as err:
5688  # self.logger.fatal(err.msg)
5689  exit_code = 1
5690 
5691  except Exception as err:
5692  # Hmmm, ignore keyboard interrupts from the
5693  # user. These are not a `serious problem'. We also
5694  # skip SystemExit, which is the exception thrown when
5695  # one calls sys.exit(). This, for example, is done by
5696  # the option parser after calling print_help(). We
5697  # also have to catch all `no such option'
5698  # complaints. Everything else we catch here is a
5699  # `serious problem'.
5700  if isinstance(err, SystemExit):
5701  self.logger.fatal(err.code)
5702  elif not isinstance(err, KeyboardInterrupt):
5703  self.logger.fatal("!" * 50)
5704  self.logger.fatal(" This looks like a serious problem.")
5705  self.logger.fatal(" If you are sure you followed all " \
5706  "instructions")
5707  self.logger.fatal(" please copy the below stack trace together")
5708  self.logger.fatal(" with a description of what you were doing to")
5709  self.logger.fatal(" jeroen.hegeman@cern.ch.")
5710  self.logger.fatal(" %s" % self.ident_string())
5711  self.logger.fatal("!" * 50)
5712  self.logger.fatal(str(err))
5713  import traceback
5714  traceback_string = traceback.format_exc()
5715  for line in traceback_string.split("\n"):
5716  self.logger.fatal(line)
5717  self.logger.fatal("!" * 50)
5718  exit_code = 2
5719 
5720  # This is the stuff that we should really do, no matter
5721  # what. Of course cleaning up after ourselves is also done
5722  # from this place. This alsokeeps track of the book keeping
5723  # so far. (This means that if half of the configuration files
5724  # were created before e.g. the disk was full, we should still
5725  # have a consistent book keeping file.
5726  finally:
5727 
5728  self.cleanup()
5729 
5730  ###
5731 
5732  if self.crab_submission == True:
5733  os.system("multicrab -create")
5734  os.system("multicrab -submit")
5735 
5736  # End of run.
5737  return exit_code
5738 
5739  # End of CMSHarvester.
5740 
5741 ###########################################################################
5742 ## Main entry point.
5743 ###########################################################################
5744 
5745 if __name__ == "__main__":
5746  "Main entry point for harvesting."
5747 
5748  CMSHarvester().run()
5749 
5750  # Done.
5751 
5752 ###########################################################################
def create_crab_config(self)
def option_handler_harvesting_mode(self, option, opt_str, value, parser)
def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser)
def check_cmssw(self)
def write_harvesting_config(self, dataset_name)
def option_handler_saveByLumiSection(self, option, opt_str, value, parser)
def option_handler_sites(self, option, opt_str, value, parser)
def write_me_extraction_config(self, dataset_name)
def __init__(self, msg)
def process_runs_use_and_ignore_lists(self)
def build_dataset_ignore_list(self)
def escape_dataset_name(self, dataset_name)
if self.datasets_information[dataset_name]["num_events"][run_number] != 0: pdb.set_trace() DEBUG DEBU...
def startElement(self, name, attrs)
def singlify_datasets(self)
def option_handler_castor_dir(self, option, opt_str, value, parser)
def option_handler_dataset_name(self, option, opt_str, value, parser): """Specify the name(s) of the ...
def option_handler_force(self, option, opt_str, value, parser)
def option_handler_caf_access(self, option, opt_str, value, parser)
def option_handler_frontier_connection(self, option, opt_str, value, parser)
def option_handler_crab_submission(self, option, opt_str, value, parser)
def create_harvesting_config(self, dataset_name)
def create_castor_path_name_special(self, dataset_name, run_number, castor_path_common)
def option_handler_harvesting_type(self, option, opt_str, value, parser)
def write_crab_config(self)
def create_harvesting_config(self, dataset_name): """Create the Python harvesting configuration for a...
def replace(string, replacements)
def write_multicrab_config(self)
def build_runs_ignore_list(self)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:65
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
def option_handler_preferred_site(self, option, opt_str, value, parser)
def option_handler_quiet(self, option, opt_str, value, parser)
def run(self)
def create_castor_path_name_common(self, dataset_name)
def show_exit_message(self)
DEBUG DEBUG DEBUGThis is probably only useful to make sure we don&#39;t muckthings up, right?Figure out across how many sites this sample has been spread.
Helper class: Error exception.
def check_globaltag(self, globaltag=None)
def ref_hist_mappings_needed(self, dataset_name=None)
def dbs_resolve_runs(self, dataset_name)
def dbs_resolve_dataset_number_of_events(self, dataset_name): """Ask DBS across how many events this ...
Helper class: Usage exception.
def option_handler_no_ref_hists(self, option, opt_str, value, parser)
def create_me_extraction_config(self, dataset_name)
In case this file is the second step (the real harvestingstep) of the two-step harvesting we have to ...
def create_me_summary_config_file_name(self, dataset_name)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def build_datasets_information(self)
def check_globaltag_exists(self, globaltag, connect_name)
def check_ref_hist_mappings(self)
def check_ref_hist_tag(self, tag_name)
def dbs_resolve_datatype(self, dataset_name)
def create_multicrab_block_name(self, dataset_name, run_number, index)
def __init__(self, cmd_line_opts=None)
def __init__(self, tag_names)
def setup_harvesting_info(self)
Helper class: DBSXMLHandler.
def option_handler_list_types(self, option, opt_str, value, parser)
def create_output_file_name(self, dataset_name, run_number=None)
def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser)
def option_handler_book_keeping_file(self, option, opt_str, value, parser)
def pick_a_site(self, sites, cmssw_version)
self.logger.debug("Checking CASTOR path piece `%s&#39;" % \ piece)
def create_and_check_castor_dir(self, castor_dir)
def dbs_resolve_cmssw_version(self, dataset_name)
def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name)
def dbs_resolve_number_of_events(self, dataset_name, run_number=None)
def set_output_level(self, output_level)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def load_ref_hist_mappings(self)
def parse_cmd_line_options(self)
def option_handler_input_Jsonfile(self, option, opt_str, value, parser)
def option_handler_input_todofile(self, option, opt_str, value, parser)
def option_handler_input_spec(self, option, opt_str, value, parser)
def create_es_prefer_snippet(self, dataset_name)
def option_handler_debug(self, option, opt_str, value, parser)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:212
def dbs_resolve_globaltag(self, dataset_name)
def check_dbs(self)
def format_conditions_string(self, globaltag)
CMSHarvester class.
def build_runs_list(self, input_method, input_name)
def check_dataset_list(self)
def create_harvesting_config_file_name(self, dataset_name)
Only add the alarming piece to the file name if this isa spread-out dataset.
def __init__(self, msg)
def dbs_check_dataset_spread(self, dataset_name)
def dbs_resolve_dataset_number_of_sites(self, dataset_name): """Ask DBS across how many sites this da...
#define update(a, b)
def build_dataset_use_list(self)
def create_me_summary_output_file_name(self, dataset_name)
def characters(self, content)
def db_account_name_cms_cond_globaltag(self)
def option_handler_no_t1access(self, option, opt_str, value, parser)
def option_handler_globaltag(self, option, opt_str, value, parser)
def db_account_name_cms_cond_dqm_summary(self)
def check_input_status(self)
def dbs_resolve_dataset_name(self, dataset_name)
#define str(s)
def create_config_file_name(self, dataset_name, run_number)
def setup_dbs(self)
Now we try to do a very simple DBS search.
def build_runs_use_list(self)
def create_multicrab_config(self)
def create_and_check_castor_dirs(self)
Helper class: CMSHarvesterHelpFormatter.
double split
Definition: MVATrainer.cc:139
def create_harvesting_output_file_name(self, dataset_name, run_number)
def endElement(self, name)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
def process_dataset_ignore_list(self)
def build_dataset_list(self, input_method, input_name)
class Handler(xml.sax.handler.ContentHandler): def startElement(self, name, attrs): if name == "resul...