CMS 3D CMS Logo

cmsHarvester.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 ###########################################################################
4 ## File : cmsHarvest.py
5 ## Authors : Jeroen Hegeman (jeroen.hegeman@cern.ch)
6 ## Niklas Pietsch (niklas.pietsch@desy.de)
7 ## Franseco Costanza (francesco.costanza@desy.de)
8 ## Last change: 20100308
9 
14 
15 """Main program to run all kinds of harvesting.
16 
17 These are the basic kinds of harvesting implemented (contact me if
18 your favourite is missing):
19 
20 - RelVal : Run for release validation samples. Makes heavy use of MC
21  truth information.
22 
23 - RelValFS: FastSim RelVal.
24 
25 - MC : Run for MC samples.
26 
27 - DQMOffline : Run for real data (could also be run for MC).
28 
29 For the mappings of these harvesting types to sequence names please
30 see the setup_harvesting_info() and option_handler_list_types()
31 methods.
32 
33 """
34 from __future__ import print_function
35 
36 ###########################################################################
37 
38 from builtins import range
39 __version__ = "3.8.2p1" # (version jump to match release)
40 __author__ = "Jeroen Hegeman (jeroen.hegeman@cern.ch)," \
41  "Niklas Pietsch (niklas.pietsch@desy.de)"
42 
43 twiki_url = "https://twiki.cern.ch/twiki/bin/view/CMS/CmsHarvester"
44 
45 ###########################################################################
46 
47 ###########################################################################
48 ## TODO list
49 
90 
91 import os
92 import sys
93 import commands
94 import re
95 import logging
96 import optparse
97 import datetime
98 import copy
99 from inspect import getargspec
100 from random import choice
101 
102 import six
103 
104 # These we need to communicate with DBS global DBSAPI
105 from DBSAPI.dbsApi import DbsApi
106 import DBSAPI.dbsException
108 from functools import reduce
109 # and these we need to parse the DBS output.
110 global xml
111 global SAXParseException
112 import xml.sax
113 from xml.sax import SAXParseException
114 
115 import Configuration.PyReleaseValidation
116 from Configuration.PyReleaseValidation.ConfigBuilder import \
117  ConfigBuilder, defaultOptions
118 # from Configuration.PyReleaseValidation.cmsDriverOptions import options, python_config_filename
119 
120 #import FWCore.ParameterSet.Config as cms
121 
122 # Debugging stuff.
123 import pdb
124 try:
125  import debug_hook
126 except ImportError:
127  pass
128 
129 ###########################################################################
130 ## Helper class: Usage exception.
131 ###########################################################################
133  def __init__(self, msg):
134  self.msg = msg
135  def __str__(self):
136  return repr(self.msg)
137 
138  # End of Usage.
139 
140 ###########################################################################
141 ## Helper class: Error exception.
142 ###########################################################################
144  def __init__(self, msg):
145  self.msg = msg
146  def __str__(self):
147  return repr(self.msg)
148 
149 ###########################################################################
150 ## Helper class: CMSHarvesterHelpFormatter.
151 ###########################################################################
152 
153 class CMSHarvesterHelpFormatter(optparse.IndentedHelpFormatter):
154  """Helper class to add some customised help output to cmsHarvester.
155 
156  We want to add some instructions, as well as a pointer to the CMS
157  Twiki.
158 
159  """
160 
161  def format_usage(self, usage):
162 
163  usage_lines = []
164 
165  sep_line = "-" * 60
166  usage_lines.append(sep_line)
167  usage_lines.append("Welcome to the CMS harvester, a (hopefully useful)")
168  usage_lines.append("tool to create harvesting configurations.")
169  usage_lines.append("For more information please have a look at the CMS Twiki:")
170  usage_lines.append(" %s" % twiki_url)
171  usage_lines.append(sep_line)
172  usage_lines.append("")
173 
174  # Since we only add to the output, we now just append the
175  # original output from IndentedHelpFormatter.
176  usage_lines.append(optparse.IndentedHelpFormatter. \
177  format_usage(self, usage))
178 
179  formatted_usage = "\n".join(usage_lines)
180  return formatted_usage
181 
182  # End of CMSHarvesterHelpFormatter.
183 
184 ###########################################################################
185 ## Helper class: DBSXMLHandler.
186 ###########################################################################
187 
188 class DBSXMLHandler(xml.sax.handler.ContentHandler):
189  """XML handler class to parse DBS results.
190 
191  The tricky thing here is that older DBS versions (2.0.5 and
192  earlier) return results in a different XML format than newer
193  versions. Previously the result values were returned as attributes
194  to the `result' element. The new approach returns result values as
195  contents of named elements.
196 
197  The old approach is handled directly in startElement(), the new
198  approach in characters().
199 
200  NOTE: All results are returned in the form of string values of
201  course!
202 
203  """
204 
205  # This is the required mapping from the name of the variable we
206  # ask for to what we call it ourselves. (Effectively this is the
207  # mapping between the old attribute key name and the new element
208  # name.)
209  mapping = {
210  "dataset" : "PATH",
211  "dataset.tag" : "PROCESSEDDATASET_GLOBALTAG",
212  "datatype.type" : "PRIMARYDSTYPE_TYPE",
213  "run" : "RUNS_RUNNUMBER",
214  "run.number" : "RUNS_RUNNUMBER",
215  "file.name" : "FILES_LOGICALFILENAME",
216  "file.numevents" : "FILES_NUMBEROFEVENTS",
217  "algo.version" : "APPVERSION_VERSION",
218  "site" : "STORAGEELEMENT_SENAME",
219  }
220 
221  def __init__(self, tag_names):
222  # This is a list used as stack to keep track of where we are
223  # in the element tree.
225  self.tag_names = tag_names
226  self.results = {}
227 
228  def startElement(self, name, attrs):
229  self.element_position.append(name)
230 
231  self.current_value = []
232 
233  #----------
234 
235  # This is to catch results from DBS 2.0.5 and earlier.
236  if name == "result":
237  for name in self.tag_names:
238  key = DBSXMLHandler.mapping[name]
239  value = str(attrs[key])
240  try:
241  self.results[name].append(value)
242  except KeyError:
243  self.results[name] = [value]
244 
245  #----------
246 
247  def endElement(self, name):
248  assert self.current_element() == name, \
249  "closing unopenend element `%s'" % name
250 
251  if self.current_element() in self.tag_names:
252  contents = "".join(self.current_value)
253  if self.current_element() in self.results:
254  self.results[self.current_element()].append(contents)
255  else:
256  self.results[self.current_element()] = [contents]
257 
258  self.element_position.pop()
259 
260  def characters(self, content):
261  # NOTE: It is possible that the actual contents of the tag
262  # gets split into multiple pieces. This method will be called
263  # for each of the pieces. This means we have to concatenate
264  # everything ourselves.
265  if self.current_element() in self.tag_names:
266  self.current_value.append(content)
267 
268  def current_element(self):
269  return self.element_position[-1]
270 
272  """Make sure that all results arrays have equal length.
273 
274  We should have received complete rows from DBS. I.e. all
275  results arrays in the handler should be of equal length.
276 
277  """
278 
279  results_valid = True
280 
281  res_names = self.results.keys()
282  if len(res_names) > 1:
283  for res_name in res_names[1:]:
284  res_tmp = self.results[res_name]
285  if len(res_tmp) != len(self.results[res_names[0]]):
286  results_valid = False
287 
288  return results_valid
289 
290  # End of DBSXMLHandler.
291 
292 ###########################################################################
293 ## CMSHarvester class.
294 ###########################################################################
295 
297  """Class to perform CMS harvesting.
298 
299  More documentation `obviously' to follow.
300 
301  """
302 
303  ##########
304 
305  def __init__(self, cmd_line_opts=None):
306  "Initialize class and process command line options."
307 
308  self.version = __version__
309 
310  # These are the harvesting types allowed. See the head of this
311  # file for more information.
313  "RelVal",
314  "RelValFS",
315  "MC",
316  "DQMOffline",
317  ]
318 
319  # These are the possible harvesting modes:
320  # - Single-step: harvesting takes place on-site in a single
321  # step. For each samples only a single ROOT file containing
322  # the harvesting results is returned.
323  # - Single-step-allow-partial: special hack to allow
324  # harvesting of partial statistics using single-step
325  # harvesting on spread-out samples.
326  # - Two-step: harvesting takes place in two steps. The first
327  # step returns a series of monitoring elenent summaries for
328  # each sample. The second step then merges these summaries
329  # locally and does the real harvesting. This second step
330  # produces the ROOT file containing the harvesting results.
332  "single-step",
333  "single-step-allow-partial",
334  "two-step"
335  ]
336 
337  # It is possible to specify a GlobalTag that will override any
338  # choices (regarding GlobalTags) made by the cmsHarvester.
339  # BUG BUG BUG
340  # For the moment, until I figure out a way to obtain the
341  # GlobalTag with which a given data (!) dataset was created,
342  # it is necessary to specify a GlobalTag for harvesting of
343  # data.
344  # BUG BUG BUG end
345  self.globaltag = None
346 
347  # It's also possible to switch off the use of reference
348  # histograms altogether.
349  self.use_ref_hists = True
350 
351  # The database name and account are hard-coded. They are not
352  # likely to change before the end-of-life of this tool. But of
353  # course there is a way to override this from the command
354  # line. One can even override the Frontier connection used for
355  # the GlobalTag and for the reference histograms
356  # independently. Please only use this for testing purposes.
358  self.frontier_connection_name["globaltag"] = "frontier://" \
359  "FrontierProd/"
360  self.frontier_connection_name["refhists"] = "frontier://" \
361  "FrontierProd/"
363  for key in self.frontier_connection_name.keys():
364  self.frontier_connection_overridden[key] = False
365 
366  # This contains information specific to each of the harvesting
367  # types. Used to create the harvesting configuration. It is
368  # filled by setup_harvesting_info().
369  self.harvesting_info = None
370 
371  ###
372 
373  # These are default `unused' values that will be filled in
374  # depending on the command line options.
375 
376  # The type of harvesting we're doing. See
377  # self.harvesting_types for allowed types.
378  self.harvesting_type = None
379 
380  # The harvesting mode, popularly known as single-step
381  # vs. two-step. The thing to remember at this point is that
382  # single-step is only possible for samples located completely
383  # at a single site (i.e. SE).
384  self.harvesting_mode = None
385  # BUG BUG BUG
386  # Default temporarily set to two-step until we can get staged
387  # jobs working with CRAB.
388  self.harvesting_mode_default = "single-step"
389  # BUG BUG BUG end
390 
391  # The input method: are we reading a dataset name (or regexp)
392  # directly from the command line or are we reading a file
393  # containing a list of dataset specifications. Actually we
394  # keep one of each for both datasets and runs.
395  self.input_method = {}
396  self.input_method["datasets"] = {}
397  self.input_method["datasets"]["use"] = None
398  self.input_method["datasets"]["ignore"] = None
399  self.input_method["runs"] = {}
400  self.input_method["runs"]["use"] = None
401  self.input_method["runs"]["ignore"] = None
402  self.input_method["runs"]["ignore"] = None
403  # The name of whatever input we're using.
404  self.input_name = {}
405  self.input_name["datasets"] = {}
406  self.input_name["datasets"]["use"] = None
407  self.input_name["datasets"]["ignore"] = None
408  self.input_name["runs"] = {}
409  self.input_name["runs"]["use"] = None
410  self.input_name["runs"]["ignore"] = None
411 
412  self.Jsonlumi = False
413  self.Jsonfilename = "YourJSON.txt"
414  self.Jsonrunfilename = "YourJSON.txt"
415  self.todofile = "YourToDofile.txt"
416 
417  # If this is true, we're running in `force mode'. In this case
418  # the sanity checks are performed but failure will not halt
419  # everything.
420  self.force_running = None
421 
422  # The base path of the output dir in CASTOR.
423  self.castor_base_dir = None
424  self.castor_base_dir_default = "/castor/cern.ch/" \
425  "cms/store/temp/" \
426  "dqm/offline/harvesting_output/"
427 
428  # The name of the file to be used for book keeping: which
429  # datasets, runs, etc. we have already processed.
431  self.book_keeping_file_name_default = "harvesting_accounting.txt"
432 
433  # The dataset name to reference histogram name mapping is read
434  # from a text file. The name of this file is kept in the
435  # following variable.
437  # And this is the default value.
438  self.ref_hist_mappings_file_name_default = "harvesting_ref_hist_mappings.txt"
439 
440  # Hmmm, hard-coded prefix of the CERN CASTOR area. This is the
441  # only supported CASTOR area.
442  # NOTE: Make sure this one starts with a `/'.
443  self.castor_prefix = "/castor/cern.ch"
444 
445  # Normally the central harvesting should be done using the
446  # `t1access' grid role. To be able to run without T1 access
447  # the --no-t1access flag can be used. This variable keeps
448  # track of that special mode.
449  self.non_t1access = False
450  self.caf_access = False
451  self.saveByLumiSection = False
452  self.crab_submission = False
453  self.nr_max_sites = 1
454 
455  self.preferred_site = "no preference"
456 
457  # This will become the list of datasets and runs to consider
458  self.datasets_to_use = {}
459  # and this will become the list of datasets and runs to skip.
461  # This, in turn, will hold all book keeping information.
463  # And this is where the dataset name to reference histogram
464  # name mapping is stored.
466 
467  # We're now also allowing run selection. This means we also
468  # have to keep list of runs requested and vetoed by the user.
469  self.runs_to_use = {}
470  self.runs_to_ignore = {}
471 
472  # Cache for CMSSW version availability at different sites.
474 
475  # Cache for checked GlobalTags.
477 
478  # Global flag to see if there were any jobs for which we could
479  # not find a matching site.
480  self.all_sites_found = True
481 
482  # Helper string centrally defined.
483  self.no_matching_site_found_str = "no_matching_site_found"
484 
485  # Store command line options for later use.
486  if cmd_line_opts is None:
487  cmd_line_opts = sys.argv[1:]
488  self.cmd_line_opts = cmd_line_opts
489 
490  # Set up the logger.
491  log_handler = logging.StreamHandler()
492  # This is the default log formatter, the debug option switches
493  # on some more information.
494  log_formatter = logging.Formatter("%(message)s")
495  log_handler.setFormatter(log_formatter)
496  logger = logging.getLogger()
497  logger.name = "main"
498  logger.addHandler(log_handler)
499  self.logger = logger
500  # The default output mode is quite verbose.
501  self.set_output_level("NORMAL")
502 
503  #logger.debug("Initialized successfully")
504 
505  # End of __init__.
506 
507  ##########
508 
509  def cleanup(self):
510  "Clean up after ourselves."
511 
512  # NOTE: This is the safe replacement of __del__.
513 
514  #self.logger.debug("All done -> shutting down")
515  logging.shutdown()
516 
517  # End of cleanup.
518 
519  ##########
520 
521  def time_stamp(self):
522  "Create a timestamp to use in the created config files."
523 
524  time_now = datetime.datetime.utcnow()
525  # We don't care about the microseconds.
526  time_now = time_now.replace(microsecond = 0)
527  time_stamp = "%sUTC" % datetime.datetime.isoformat(time_now)
528 
529  # End of time_stamp.
530  return time_stamp
531 
532  ##########
533 
534  def ident_string(self):
535  "Spit out an identification string for cmsHarvester.py."
536 
537  ident_str = "`cmsHarvester.py " \
538  "version %s': cmsHarvester.py %s" % \
539  (__version__,
540  reduce(lambda x, y: x+' '+y, sys.argv[1:]))
541 
542  return ident_str
543 
544  ##########
545 
546  def format_conditions_string(self, globaltag):
547  """Create the conditions string needed for `cmsDriver'.
548 
549  Just glueing the FrontierConditions bit in front of it really.
550 
551  """
552 
553  # Not very robust but okay. The idea is that if the user
554  # specified (since this cannot happen with GlobalTags coming
555  # from DBS) something containing `conditions', they probably
556  # know what they're doing and we should not muck things up. In
557  # all other cases we just assume we only received the
558  # GlobalTag part and we built the usual conditions string from
559  # that .
560  if globaltag.lower().find("conditions") > -1:
561  conditions_string = globaltag
562  else:
563  conditions_string = "FrontierConditions_GlobalTag,%s" % \
564  globaltag
565 
566  # End of format_conditions_string.
567  return conditions_string
568 
569  ##########
570 
572  """Return the database account name used to store the GlobalTag.
573 
574  The name of the database account depends (albeit weakly) on
575  the CMSSW release version.
576 
577  """
578 
579  # This never changed, unlike the cms_cond_31X_DQM_SUMMARY ->
580  # cms_cond_34X_DQM transition.
581  account_name = "CMS_COND_31X_GLOBALTAG"
582 
583  # End of db_account_name_cms_cond_globaltag.
584  return account_name
585 
586  ##########
587 
589  """See db_account_name_cms_cond_globaltag."""
590 
591  account_name = None
592  version = self.cmssw_version[6:11]
593  if version < "3_4_0":
594  account_name = "CMS_COND_31X_DQM_SUMMARY"
595  else:
596  account_name = "CMS_COND_34X"
597 
598  # End of db_account_name_cms_cond_dqm_summary.
599  return account_name
600 
601  ##########
602 
604  "Create a nice header to be used to mark the generated files."
605 
606  tmp = []
607 
608  time_stamp = self.time_stamp()
609  ident_str = self.ident_string()
610  tmp.append("# %s" % time_stamp)
611  tmp.append("# WARNING: This file was created automatically!")
612  tmp.append("")
613  tmp.append("# Created by %s" % ident_str)
614 
615  header = "\n".join(tmp)
616 
617  # End of config_file_header.
618  return header
619 
620  ##########
621 
622  def set_output_level(self, output_level):
623  """Adjust the level of output generated.
624 
625  Choices are:
626  - normal : default level of output
627  - quiet : less output than the default
628  - verbose : some additional information
629  - debug : lots more information, may be overwhelming
630 
631  NOTE: The debug option is a bit special in the sense that it
632  also modifies the output format.
633 
634  """
635 
636  # NOTE: These levels are hooked up to the ones used in the
637  # logging module.
638  output_levels = {
639  "NORMAL" : logging.INFO,
640  "QUIET" : logging.WARNING,
641  "VERBOSE" : logging.INFO,
642  "DEBUG" : logging.DEBUG
643  }
644 
645  output_level = output_level.upper()
646 
647  try:
648  # Update the logger.
649  self.log_level = output_levels[output_level]
650  self.logger.setLevel(self.log_level)
651  except KeyError:
652  # Show a complaint
653  self.logger.fatal("Unknown output level `%s'" % ouput_level)
654  # and re-raise an exception.
655  raise Exception
656 
657  # End of set_output_level.
658 
659  ##########
660 
661  def option_handler_debug(self, option, opt_str, value, parser):
662  """Switch to debug mode.
663 
664  This both increases the amount of output generated, as well as
665  changes the format used (more detailed information is given).
666 
667  """
668 
669  # Switch to a more informative log formatter for debugging.
670  log_formatter_debug = logging.Formatter("[%(levelname)s] " \
671  # NOTE: funcName was
672  # only implemented
673  # starting with python
674  # 2.5.
675  #"%(funcName)s() " \
676  #"@%(filename)s:%(lineno)d " \
677  "%(message)s")
678  # Hmmm, not very nice. This assumes there's only a single
679  # handler associated with the current logger.
680  log_handler = self.logger.handlers[0]
681  log_handler.setFormatter(log_formatter_debug)
682  self.set_output_level("DEBUG")
683 
684  # End of option_handler_debug.
685 
686  ##########
687 
688  def option_handler_quiet(self, option, opt_str, value, parser):
689  "Switch to quiet mode: less verbose."
690 
691  self.set_output_level("QUIET")
692 
693  # End of option_handler_quiet.
694 
695  ##########
696 
697  def option_handler_force(self, option, opt_str, value, parser):
698  """Switch on `force mode' in which case we don't brake for nobody.
699 
700  In so-called `force mode' all sanity checks are performed but
701  we don't halt on failure. Of course this requires some care
702  from the user.
703 
704  """
705 
706  self.logger.debug("Switching on `force mode'.")
707  self.force_running = True
708 
709  # End of option_handler_force.
710 
711  ##########
712 
713  def option_handler_harvesting_type(self, option, opt_str, value, parser):
714  """Set the harvesting type to be used.
715 
716  This checks that no harvesting type is already set, and sets
717  the harvesting type to be used to the one specified. If a
718  harvesting type is already set an exception is thrown. The
719  same happens when an unknown type is specified.
720 
721  """
722 
723  # Check for (in)valid harvesting types.
724  # NOTE: The matching is done in a bit of a complicated
725  # way. This allows the specification of the type to be
726  # case-insensitive while still ending up with the properly
727  # `cased' version afterwards.
728  value = value.lower()
729  harvesting_types_lowered = [i.lower() for i in self.harvesting_types]
730  try:
731  type_index = harvesting_types_lowered.index(value)
732  # If this works, we now have the index to the `properly
733  # cased' version of the harvesting type.
734  except ValueError:
735  self.logger.fatal("Unknown harvesting type `%s'" % \
736  value)
737  self.logger.fatal(" possible types are: %s" %
738  ", ".join(self.harvesting_types))
739  raise Usage("Unknown harvesting type `%s'" % \
740  value)
741 
742  # Check if multiple (by definition conflicting) harvesting
743  # types are being specified.
744  if not self.harvesting_type is None:
745  msg = "Only one harvesting type should be specified"
746  self.logger.fatal(msg)
747  raise Usage(msg)
748  self.harvesting_type = self.harvesting_types[type_index]
749 
750  self.logger.info("Harvesting type to be used: `%s'" % \
751  self.harvesting_type)
752 
753  # End of option_handler_harvesting_type.
754 
755  ##########
756 
757  def option_handler_harvesting_mode(self, option, opt_str, value, parser):
758  """Set the harvesting mode to be used.
759 
760  Single-step harvesting can be used for samples that are
761  located completely at a single site (= SE). Otherwise use
762  two-step mode.
763 
764  """
765 
766  # Check for valid mode.
767  harvesting_mode = value.lower()
768  if not harvesting_mode in self.harvesting_modes:
769  msg = "Unknown harvesting mode `%s'" % harvesting_mode
770  self.logger.fatal(msg)
771  self.logger.fatal(" possible modes are: %s" % \
772  ", ".join(self.harvesting_modes))
773  raise Usage(msg)
774 
775  # Check if we've been given only a single mode, otherwise
776  # complain.
777  if not self.harvesting_mode is None:
778  msg = "Only one harvesting mode should be specified"
779  self.logger.fatal(msg)
780  raise Usage(msg)
781  self.harvesting_mode = harvesting_mode
782 
783  self.logger.info("Harvesting mode to be used: `%s'" % \
784  self.harvesting_mode)
785 
786  # End of option_handler_harvesting_mode.
787 
788  ##########
789 
790  def option_handler_globaltag(self, option, opt_str, value, parser):
791  """Set the GlobalTag to be used, overriding our own choices.
792 
793  By default the cmsHarvester will use the GlobalTag with which
794  a given dataset was created also for the harvesting. The
795  --globaltag option is the way to override this behaviour.
796 
797  """
798 
799  # Make sure that this flag only occurred once.
800  if not self.globaltag is None:
801  msg = "Only one GlobalTag should be specified"
802  self.logger.fatal(msg)
803  raise Usage(msg)
804  self.globaltag = value
805 
806  self.logger.info("GlobalTag to be used: `%s'" % \
807  self.globaltag)
808 
809  # End of option_handler_globaltag.
810 
811  ##########
812 
813  def option_handler_no_ref_hists(self, option, opt_str, value, parser):
814  "Switch use of all reference histograms off."
815 
816  self.use_ref_hists = False
817 
818  self.logger.warning("Switching off all use of reference histograms")
819 
820  # End of option_handler_no_ref_hists.
821 
822  ##########
823 
824  def option_handler_frontier_connection(self, option, opt_str,
825  value, parser):
826  """Override the default Frontier connection string.
827 
828  Please only use this for testing (e.g. when a test payload has
829  been inserted into cms_orc_off instead of cms_orc_on).
830 
831  This method gets called for three different command line
832  options:
833  - --frontier-connection,
834  - --frontier-connection-for-globaltag,
835  - --frontier-connection-for-refhists.
836  Appropriate care has to be taken to make sure things are only
837  specified once.
838 
839  """
840 
841  # Figure out with which command line option we've been called.
842  frontier_type = opt_str.split("-")[-1]
843  if frontier_type == "connection":
844  # Main option: change all connection strings.
845  frontier_types = self.frontier_connection_name.keys()
846  else:
847  frontier_types = [frontier_type]
848 
849  # Make sure that each Frontier connection is specified only
850  # once. (Okay, in a bit of a dodgy way...)
851  for connection_name in frontier_types:
852  if self.frontier_connection_overridden[connection_name] == True:
853  msg = "Please specify either:\n" \
854  " `--frontier-connection' to change the " \
855  "Frontier connection used for everything, or\n" \
856  "either one or both of\n" \
857  " `--frontier-connection-for-globaltag' to " \
858  "change the Frontier connection used for the " \
859  "GlobalTag and\n" \
860  " `--frontier-connection-for-refhists' to change " \
861  "the Frontier connection used for the " \
862  "reference histograms."
863  self.logger.fatal(msg)
864  raise Usage(msg)
865 
866  frontier_prefix = "frontier://"
867  if not value.startswith(frontier_prefix):
868  msg = "Expecting Frontier connections to start with " \
869  "`%s'. You specified `%s'." % \
870  (frontier_prefix, value)
871  self.logger.fatal(msg)
872  raise Usage(msg)
873  # We also kind of expect this to be either FrontierPrep or
874  # FrontierProd (but this is just a warning).
875  if value.find("FrontierProd") < 0 and \
876  value.find("FrontierProd") < 0:
877  msg = "Expecting Frontier connections to contain either " \
878  "`FrontierProd' or `FrontierPrep'. You specified " \
879  "`%s'. Are you sure?" % \
880  value
881  self.logger.warning(msg)
882 
883  if not value.endswith("/"):
884  value += "/"
885 
886  for connection_name in frontier_types:
887  self.frontier_connection_name[connection_name] = value
888  self.frontier_connection_overridden[connection_name] = True
889 
890  frontier_type_str = "unknown"
891  if connection_name == "globaltag":
892  frontier_type_str = "the GlobalTag"
893  elif connection_name == "refhists":
894  frontier_type_str = "the reference histograms"
895 
896  self.logger.warning("Overriding default Frontier " \
897  "connection for %s " \
898  "with `%s'" % \
899  (frontier_type_str,
900  self.frontier_connection_name[connection_name]))
901 
902  # End of option_handler_frontier_connection
903 
904  ##########
905 
906  def option_handler_input_todofile(self, option, opt_str, value, parser):
907 
908  self.todofile = value
909  # End of option_handler_input_todofile.
910 
911  ##########
912 
913  def option_handler_input_Jsonfile(self, option, opt_str, value, parser):
914 
915  self.Jsonfilename = value
916  # End of option_handler_input_Jsonfile.
917 
918  ##########
919 
920  def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser):
921 
922  self.Jsonrunfilename = value
923  # End of option_handler_input_Jsonrunfile.
924 
925  ##########
926 
927  def option_handler_input_spec(self, option, opt_str, value, parser):
928  """TODO TODO TODO
929  Document this...
930 
931  """
932 
933  # Figure out if we were called for the `use these' or the
934  # `ignore these' case.
935  if opt_str.lower().find("ignore") > -1:
936  spec_type = "ignore"
937  else:
938  spec_type = "use"
939 
940  # Similar: are we being called for datasets or for runs?
941  if opt_str.lower().find("dataset") > -1:
942  select_type = "datasets"
943  else:
944  select_type = "runs"
945 
946  if not self.input_method[select_type][spec_type] is None:
947  msg = "Please only specify one input method " \
948  "(for the `%s' case)" % opt_str
949  self.logger.fatal(msg)
950  raise Usage(msg)
951 
952  input_method = opt_str.replace("-", "").replace("ignore", "")
953  self.input_method[select_type][spec_type] = input_method
954  self.input_name[select_type][spec_type] = value
955 
956  self.logger.debug("Input method for the `%s' case: %s" % \
957  (spec_type, input_method))
958 
959  # End of option_handler_input_spec
960 
961  ##########
962 
963  def option_handler_book_keeping_file(self, option, opt_str, value, parser):
964  """Store the name of the file to be used for book keeping.
965 
966  The only check done here is that only a single book keeping
967  file is specified.
968 
969  """
970 
971  file_name = value
972 
973  if not self.book_keeping_file_name is None:
974  msg = "Only one book keeping file should be specified"
975  self.logger.fatal(msg)
976  raise Usage(msg)
977  self.book_keeping_file_name = file_name
978 
979  self.logger.info("Book keeping file to be used: `%s'" % \
981 
982  # End of option_handler_book_keeping_file.
983 
984  ##########
985 
986  def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser):
987  """Store the name of the file for the ref. histogram mapping.
988 
989  """
990 
991  file_name = value
992 
993  if not self.ref_hist_mappings_file_name is None:
994  msg = "Only one reference histogram mapping file " \
995  "should be specified"
996  self.logger.fatal(msg)
997  raise Usage(msg)
998  self.ref_hist_mappings_file_name = file_name
999 
1000  self.logger.info("Reference histogram mapping file " \
1001  "to be used: `%s'" % \
1003 
1004  # End of option_handler_ref_hist_mapping_file.
1005 
1006  ##########
1007 
1008  # OBSOLETE OBSOLETE OBSOLETE
1009 
1010 ## def option_handler_dataset_name(self, option, opt_str, value, parser):
1011 ## """Specify the name(s) of the dataset(s) to be processed.
1012 
1013 ## It is checked to make sure that no dataset name or listfile
1014 ## names are given yet. If all is well (i.e. we still have a
1015 ## clean slate) the dataset name is stored for later use,
1016 ## otherwise a Usage exception is raised.
1017 
1018 ## """
1019 
1020 ## if not self.input_method is None:
1021 ## if self.input_method == "dataset":
1022 ## raise Usage("Please only feed me one dataset specification")
1023 ## elif self.input_method == "listfile":
1024 ## raise Usage("Cannot specify both dataset and input list file")
1025 ## else:
1026 ## assert False, "Unknown input method `%s'" % self.input_method
1027 ## self.input_method = "dataset"
1028 ## self.input_name = value
1029 ## self.logger.info("Input method used: %s" % self.input_method)
1030 
1031 ## # End of option_handler_dataset_name.
1032 
1033 ## ##########
1034 
1035 ## def option_handler_listfile_name(self, option, opt_str, value, parser):
1036 ## """Specify the input list file containing datasets to be processed.
1037 
1038 ## It is checked to make sure that no dataset name or listfile
1039 ## names are given yet. If all is well (i.e. we still have a
1040 ## clean slate) the listfile name is stored for later use,
1041 ## otherwise a Usage exception is raised.
1042 
1043 ## """
1044 
1045 ## if not self.input_method is None:
1046 ## if self.input_method == "listfile":
1047 ## raise Usage("Please only feed me one list file")
1048 ## elif self.input_method == "dataset":
1049 ## raise Usage("Cannot specify both dataset and input list file")
1050 ## else:
1051 ## assert False, "Unknown input method `%s'" % self.input_method
1052 ## self.input_method = "listfile"
1053 ## self.input_name = value
1054 ## self.logger.info("Input method used: %s" % self.input_method)
1055 
1056 ## # End of option_handler_listfile_name.
1057 
1058  # OBSOLETE OBSOLETE OBSOLETE end
1059 
1060  ##########
1061 
1062  def option_handler_castor_dir(self, option, opt_str, value, parser):
1063  """Specify where on CASTOR the output should go.
1064 
1065  At the moment only output to CERN CASTOR is
1066  supported. Eventually the harvested results should go into the
1067  central place for DQM on CASTOR anyway.
1068 
1069  """
1070 
1071  # Check format of specified CASTOR area.
1072  castor_dir = value
1073  #castor_dir = castor_dir.lstrip(os.path.sep)
1074  castor_prefix = self.castor_prefix
1075 
1076  # Add a leading slash if necessary and clean up the path.
1077  castor_dir = os.path.join(os.path.sep, castor_dir)
1078  self.castor_base_dir = os.path.normpath(castor_dir)
1079 
1080  self.logger.info("CASTOR (base) area to be used: `%s'" % \
1081  self.castor_base_dir)
1082 
1083  # End of option_handler_castor_dir.
1084 
1085  ##########
1086 
1087  def option_handler_no_t1access(self, option, opt_str, value, parser):
1088  """Set the self.no_t1access flag to try and create jobs that
1089  run without special `t1access' role.
1090 
1091  """
1092 
1093  self.non_t1access = True
1094 
1095  self.logger.warning("Running in `non-t1access' mode. " \
1096  "Will try to create jobs that run " \
1097  "without special rights but no " \
1098  "further promises...")
1099 
1100  # End of option_handler_no_t1access.
1101 
1102  ##########
1103 
1104  def option_handler_caf_access(self, option, opt_str, value, parser):
1105  """Set the self.caf_access flag to try and create jobs that
1106  run on the CAF.
1107 
1108  """
1109  self.caf_access = True
1110 
1111  self.logger.warning("Running in `caf_access' mode. " \
1112  "Will try to create jobs that run " \
1113  "on CAF but no" \
1114  "further promises...")
1115 
1116  # End of option_handler_caf_access.
1117 
1118  ##########
1119 
1120  def option_handler_saveByLumiSection(self, option, opt_str, value, parser):
1121  """Set process.dqmSaver.saveByLumiSectiont=1 in cfg harvesting file
1122  """
1123  self.saveByLumiSection = True
1124 
1125  self.logger.warning("waning concerning saveByLumiSection option")
1126 
1127  # End of option_handler_saveByLumiSection.
1128 
1129 
1130  ##########
1131 
1132  def option_handler_crab_submission(self, option, opt_str, value, parser):
1133  """Crab jobs are not created and
1134  "submitted automatically",
1135  """
1136  self.crab_submission = True
1137 
1138  # End of option_handler_crab_submission.
1139 
1140  ##########
1141 
1142  def option_handler_sites(self, option, opt_str, value, parser):
1143 
1144  self.nr_max_sites = value
1145 
1146  ##########
1147 
1148  def option_handler_preferred_site(self, option, opt_str, value, parser):
1149 
1150  self.preferred_site = value
1151 
1152  ##########
1153 
1154  def option_handler_list_types(self, option, opt_str, value, parser):
1155  """List all harvesting types and their mappings.
1156 
1157  This lists all implemented harvesting types with their
1158  corresponding mappings to sequence names. This had to be
1159  separated out from the help since it depends on the CMSSW
1160  version and was making things a bit of a mess.
1161 
1162  NOTE: There is no way (at least not that I could come up with)
1163  to code this in a neat generic way that can be read both by
1164  this method and by setup_harvesting_info(). Please try hard to
1165  keep these two methods in sync!
1166 
1167  """
1168 
1169  sep_line = "-" * 50
1170  sep_line_short = "-" * 20
1171 
1172  print(sep_line)
1173  print("The following harvesting types are available:")
1174  print(sep_line)
1175 
1176  print("`RelVal' maps to:")
1177  print(" pre-3_3_0 : HARVESTING:validationHarvesting")
1178  print(" 3_4_0_pre2 and later: HARVESTING:validationHarvesting+dqmHarvesting")
1179  print(" Exceptions:")
1180  print(" 3_3_0_pre1-4 : HARVESTING:validationHarvesting")
1181  print(" 3_3_0_pre6 : HARVESTING:validationHarvesting")
1182  print(" 3_4_0_pre1 : HARVESTING:validationHarvesting")
1183 
1184  print(sep_line_short)
1185 
1186  print("`RelValFS' maps to:")
1187  print(" always : HARVESTING:validationHarvestingFS")
1188 
1189  print(sep_line_short)
1190 
1191  print("`MC' maps to:")
1192  print(" always : HARVESTING:validationprodHarvesting")
1193 
1194  print(sep_line_short)
1195 
1196  print("`DQMOffline' maps to:")
1197  print(" always : HARVESTING:dqmHarvesting")
1198 
1199  print(sep_line)
1200 
1201  # We're done, let's quit. (This is the same thing optparse
1202  # does after printing the help.)
1203  raise SystemExit
1204 
1205  # End of option_handler_list_types.
1206 
1207  ##########
1208 
1210  """Fill our dictionary with all info needed to understand
1211  harvesting.
1212 
1213  This depends on the CMSSW version since at some point the
1214  names and sequences were modified.
1215 
1216  NOTE: There is no way (at least not that I could come up with)
1217  to code this in a neat generic way that can be read both by
1218  this method and by option_handler_list_types(). Please try
1219  hard to keep these two methods in sync!
1220 
1221  """
1222 
1223  assert not self.cmssw_version is None, \
1224  "ERROR setup_harvesting() requires " \
1225  "self.cmssw_version to be set!!!"
1226 
1227  harvesting_info = {}
1228 
1229  # This is the version-independent part.
1230  harvesting_info["DQMOffline"] = {}
1231  harvesting_info["DQMOffline"]["beamspot"] = None
1232  harvesting_info["DQMOffline"]["eventcontent"] = None
1233  harvesting_info["DQMOffline"]["harvesting"] = "AtRunEnd"
1234 
1235  harvesting_info["RelVal"] = {}
1236  harvesting_info["RelVal"]["beamspot"] = None
1237  harvesting_info["RelVal"]["eventcontent"] = None
1238  harvesting_info["RelVal"]["harvesting"] = "AtRunEnd"
1239 
1240  harvesting_info["RelValFS"] = {}
1241  harvesting_info["RelValFS"]["beamspot"] = None
1242  harvesting_info["RelValFS"]["eventcontent"] = None
1243  harvesting_info["RelValFS"]["harvesting"] = "AtRunEnd"
1244 
1245  harvesting_info["MC"] = {}
1246  harvesting_info["MC"]["beamspot"] = None
1247  harvesting_info["MC"]["eventcontent"] = None
1248  harvesting_info["MC"]["harvesting"] = "AtRunEnd"
1249 
1250  # This is the version-dependent part. And I know, strictly
1251  # speaking it's not necessary to fill in all three types since
1252  # in a single run we'll only use one type anyway. This does
1253  # look more readable, however, and required less thought from
1254  # my side when I put this together.
1255 
1256  # DEBUG DEBUG DEBUG
1257  # Check that we understand our own version naming.
1258  assert self.cmssw_version.startswith("CMSSW_")
1259  # DEBUG DEBUG DEBUG end
1260 
1261  version = self.cmssw_version[6:]
1262 
1263  #----------
1264 
1265  # RelVal
1266  step_string = None
1267  if version < "3_3_0":
1268  step_string = "validationHarvesting"
1269  elif version in ["3_3_0_pre1", "3_3_0_pre2",
1270  "3_3_0_pre3", "3_3_0_pre4",
1271  "3_3_0_pre6", "3_4_0_pre1"]:
1272  step_string = "validationHarvesting"
1273  else:
1274  step_string = "validationHarvesting+dqmHarvesting"
1275 
1276  harvesting_info["RelVal"]["step_string"] = step_string
1277 
1278  # DEBUG DEBUG DEBUG
1279  # Let's make sure we found something.
1280  assert not step_string is None, \
1281  "ERROR Could not decide a RelVal harvesting sequence " \
1282  "for CMSSW version %s" % self.cmssw_version
1283  # DEBUG DEBUG DEBUG end
1284 
1285  #----------
1286 
1287  # RelVal
1288  step_string = "validationHarvestingFS"
1289 
1290  harvesting_info["RelValFS"]["step_string"] = step_string
1291 
1292  #----------
1293 
1294  # MC
1295  step_string = "validationprodHarvesting"
1296 
1297  harvesting_info["MC"]["step_string"] = step_string
1298 
1299  # DEBUG DEBUG DEBUG
1300  # Let's make sure we found something.
1301  assert not step_string is None, \
1302  "ERROR Could not decide a MC harvesting " \
1303  "sequence for CMSSW version %s" % self.cmssw_version
1304  # DEBUG DEBUG DEBUG end
1305 
1306  #----------
1307 
1308  # DQMOffline
1309  step_string = "dqmHarvesting"
1310 
1311  harvesting_info["DQMOffline"]["step_string"] = step_string
1312 
1313  #----------
1314 
1315  self.harvesting_info = harvesting_info
1316 
1317  self.logger.info("Based on the CMSSW version (%s) " \
1318  "I decided to use the `HARVESTING:%s' " \
1319  "sequence for %s harvesting" % \
1320  (self.cmssw_version,
1321  self.harvesting_info[self.harvesting_type]["step_string"],
1322  self.harvesting_type))
1323 
1324  # End of setup_harvesting_info.
1325 
1326  ##########
1327 
1328  def create_castor_path_name_common(self, dataset_name):
1329  """Build the common part of the output path to be used on
1330  CASTOR.
1331 
1332  This consists of the CASTOR area base path specified by the
1333  user and a piece depending on the data type (data vs. MC), the
1334  harvesting type and the dataset name followed by a piece
1335  containing the run number and event count. (See comments in
1336  create_castor_path_name_special for details.) This method
1337  creates the common part, without run number and event count.
1338 
1339  """
1340 
1341  castor_path = self.castor_base_dir
1342 
1343  ###
1344 
1345  # The data type: data vs. mc.
1346  datatype = self.datasets_information[dataset_name]["datatype"]
1347  datatype = datatype.lower()
1348  castor_path = os.path.join(castor_path, datatype)
1349 
1350  # The harvesting type.
1351  harvesting_type = self.harvesting_type
1352  harvesting_type = harvesting_type.lower()
1353  castor_path = os.path.join(castor_path, harvesting_type)
1354 
1355  # The CMSSW release version (only the `digits'). Note that the
1356  # CMSSW version used here is the version used for harvesting,
1357  # not the one from the dataset. This does make the results
1358  # slightly harder to find. On the other hand it solves
1359  # problems in case one re-harvests a given dataset with a
1360  # different CMSSW version, which would lead to ambiguous path
1361  # names. (Of course for many cases the harvesting is done with
1362  # the same CMSSW version the dataset was created with.)
1363  release_version = self.cmssw_version
1364  release_version = release_version.lower(). \
1365  replace("cmssw", ""). \
1366  strip("_")
1367  castor_path = os.path.join(castor_path, release_version)
1368 
1369  # The dataset name.
1370  dataset_name_escaped = self.escape_dataset_name(dataset_name)
1371  castor_path = os.path.join(castor_path, dataset_name_escaped)
1372 
1373  ###
1374 
1375  castor_path = os.path.normpath(castor_path)
1376 
1377  # End of create_castor_path_name_common.
1378  return castor_path
1379 
1380  ##########
1381 
1383  dataset_name, run_number,
1384  castor_path_common):
1385  """Create the specialised part of the CASTOR output dir name.
1386 
1387  NOTE: To avoid clashes with `incremental harvesting'
1388  (re-harvesting when a dataset grows) we have to include the
1389  event count in the path name. The underlying `problem' is that
1390  CRAB does not overwrite existing output files so if the output
1391  file already exists CRAB will fail to copy back the output.
1392 
1393  NOTE: It's not possible to create different kinds of
1394  harvesting jobs in a single call to this tool. However, in
1395  principle it could be possible to create both data and MC jobs
1396  in a single go.
1397 
1398  NOTE: The number of events used in the path name is the
1399  _total_ number of events in the dataset/run at the time of
1400  harvesting. If we're doing partial harvesting the final
1401  results will reflect lower statistics. This is a) the easiest
1402  to code and b) the least likely to lead to confusion if
1403  someone ever decides to swap/copy around file blocks between
1404  sites.
1405 
1406  """
1407 
1408  castor_path = castor_path_common
1409 
1410  ###
1411 
1412  # The run number part.
1413  castor_path = os.path.join(castor_path, "run_%d" % run_number)
1414 
1415  ###
1416 
1417  # The event count (i.e. the number of events we currently see
1418  # for this dataset).
1419  #nevents = self.datasets_information[dataset_name] \
1420  # ["num_events"][run_number]
1421  castor_path = os.path.join(castor_path, "nevents")
1422 
1423  ###
1424 
1425  castor_path = os.path.normpath(castor_path)
1426 
1427  # End of create_castor_path_name_special.
1428  return castor_path
1429 
1430  ##########
1431 
1433  """Make sure all required CASTOR output dirs exist.
1434 
1435  This checks the CASTOR base dir specified by the user as well
1436  as all the subdirs required by the current set of jobs.
1437 
1438  """
1439 
1440  self.logger.info("Checking (and if necessary creating) CASTOR " \
1441  "output area(s)...")
1442 
1443  # Call the real checker method for the base dir.
1444  self.create_and_check_castor_dir(self.castor_base_dir)
1445 
1446  # Now call the checker for all (unique) subdirs.
1447  castor_dirs = []
1448  for (dataset_name, runs) in six.iteritems(self.datasets_to_use):
1449 
1450  for run in runs:
1451  castor_dirs.append(self.datasets_information[dataset_name] \
1452  ["castor_path"][run])
1453  castor_dirs_unique = sorted(set(castor_dirs))
1454  # This can take some time. E.g. CRAFT08 has > 300 runs, each
1455  # of which will get a new directory. So we show some (rough)
1456  # info in between.
1457  ndirs = len(castor_dirs_unique)
1458  step = max(ndirs / 10, 1)
1459  for (i, castor_dir) in enumerate(castor_dirs_unique):
1460  if (i + 1) % step == 0 or \
1461  (i + 1) == ndirs:
1462  self.logger.info(" %d/%d" % \
1463  (i + 1, ndirs))
1464  self.create_and_check_castor_dir(castor_dir)
1465 
1466  # Now check if the directory is empty. If (an old version
1467  # of) the output file already exists CRAB will run new
1468  # jobs but never copy the results back. We assume the user
1469  # knows what they are doing and only issue a warning in
1470  # case the directory is not empty.
1471  self.logger.debug("Checking if path `%s' is empty" % \
1472  castor_dir)
1473  cmd = "rfdir %s" % castor_dir
1474  (status, output) = commands.getstatusoutput(cmd)
1475  if status != 0:
1476  msg = "Could not access directory `%s'" \
1477  " !!! This is bad since I should have just" \
1478  " created it !!!" % castor_dir
1479  self.logger.fatal(msg)
1480  raise Error(msg)
1481  if len(output) > 0:
1482  self.logger.warning("Output directory `%s' is not empty:" \
1483  " new jobs will fail to" \
1484  " copy back output" % \
1485  castor_dir)
1486 
1487  # End of create_and_check_castor_dirs.
1488 
1489  ##########
1490 
1491  def create_and_check_castor_dir(self, castor_dir):
1492  """Check existence of the give CASTOR dir, if necessary create
1493  it.
1494 
1495  Some special care has to be taken with several things like
1496  setting the correct permissions such that CRAB can store the
1497  output results. Of course this means that things like
1498  /castor/cern.ch/ and user/j/ have to be recognised and treated
1499  properly.
1500 
1501  NOTE: Only CERN CASTOR area (/castor/cern.ch/) supported for
1502  the moment.
1503 
1504  NOTE: This method uses some slightly tricky caching to make
1505  sure we don't keep over and over checking the same base paths.
1506 
1507  """
1508 
1509  ###
1510 
1511  # Local helper function to fully split a path into pieces.
1512  def split_completely(path):
1513  (parent_path, name) = os.path.split(path)
1514  if name == "":
1515  return (parent_path, )
1516  else:
1517  return split_completely(parent_path) + (name, )
1518 
1519  ###
1520 
1521  # Local helper function to check rfio (i.e. CASTOR)
1522  # directories.
1523  def extract_permissions(rfstat_output):
1524  """Parse the output from rfstat and return the
1525  5-digit permissions string."""
1526 
1527  permissions_line = [i for i in output.split("\n") \
1528  if i.lower().find("protection") > -1]
1529  regexp = re.compile(".*\(([0123456789]{5})\).*")
1530  match = regexp.search(rfstat_output)
1531  if not match or len(match.groups()) != 1:
1532  msg = "Could not extract permissions " \
1533  "from output: %s" % rfstat_output
1534  self.logger.fatal(msg)
1535  raise Error(msg)
1536  permissions = match.group(1)
1537 
1538  # End of extract_permissions.
1539  return permissions
1540 
1541  ###
1542 
1543  # These are the pieces of CASTOR directories that we do not
1544  # want to touch when modifying permissions.
1545 
1546  # NOTE: This is all a bit involved, basically driven by the
1547  # fact that one wants to treat the `j' directory of
1548  # `/castor/cern.ch/user/j/jhegeman/' specially.
1549  # BUG BUG BUG
1550  # This should be simplified, for example by comparing to the
1551  # CASTOR prefix or something like that.
1552  # BUG BUG BUG end
1553  castor_paths_dont_touch = {
1554  0: ["/", "castor", "cern.ch", "cms", "store", "temp",
1555  "dqm", "offline", "user"],
1556  -1: ["user", "store"]
1557  }
1558 
1559  self.logger.debug("Checking CASTOR path `%s'" % castor_dir)
1560 
1561  ###
1562 
1563  # First we take the full CASTOR path apart.
1564  castor_path_pieces = split_completely(castor_dir)
1565 
1566  # Now slowly rebuild the CASTOR path and see if a) all
1567  # permissions are set correctly and b) the final destination
1568  # exists.
1569  path = ""
1570  check_sizes = sorted(castor_paths_dont_touch.keys())
1571  len_castor_path_pieces = len(castor_path_pieces)
1572  for piece_index in range (len_castor_path_pieces):
1573  skip_this_path_piece = False
1574  piece = castor_path_pieces[piece_index]
1575 ## self.logger.debug("Checking CASTOR path piece `%s'" % \
1576 ## piece)
1577  for check_size in check_sizes:
1578  # Do we need to do anything with this?
1579  if (piece_index + check_size) > -1:
1580 ## self.logger.debug("Checking `%s' against `%s'" % \
1581 ## (castor_path_pieces[piece_index + check_size],
1582 ## castor_paths_dont_touch[check_size]))
1583  if castor_path_pieces[piece_index + check_size] in castor_paths_dont_touch[check_size]:
1584 ## self.logger.debug(" skipping")
1585  skip_this_path_piece = True
1586 ## else:
1587 ## # Piece not in the list, fine.
1588 ## self.logger.debug(" accepting")
1589  # Add piece to the path we're building.
1590 ## self.logger.debug("!!! Skip path piece `%s'? %s" % \
1591 ## (piece, str(skip_this_path_piece)))
1592 ## self.logger.debug("Adding piece to path...")
1593  path = os.path.join(path, piece)
1594 ## self.logger.debug("Path is now `%s'" % \
1595 ## path)
1596 
1597  # Hmmmm, only at this point can we do some caching. Not
1598  # ideal, but okay.
1599  try:
1600  if path in self.castor_path_checks_cache:
1601  continue
1602  except AttributeError:
1603  # This only happens the first time around.
1604  self.castor_path_checks_cache = []
1605  self.castor_path_checks_cache.append(path)
1606 
1607  # Now, unless we're supposed to skip this piece of the
1608  # path, let's make sure it exists and set the permissions
1609  # correctly for use by CRAB. This means that:
1610  # - the final output directory should (at least) have
1611  # permissions 775
1612  # - all directories above that should (at least) have
1613  # permissions 755.
1614 
1615  # BUT: Even though the above permissions are the usual
1616  # ones to used when setting up CASTOR areas for grid job
1617  # output, there is one caveat in case multiple people are
1618  # working in the same CASTOR area. If user X creates
1619  # /a/b/c/ and user Y wants to create /a/b/d/ he/she does
1620  # not have sufficient rights. So: we set all dir
1621  # permissions to 775 to avoid this.
1622 
1623  if not skip_this_path_piece:
1624 
1625  # Ok, first thing: let's make sure this directory
1626  # exists.
1627  # NOTE: The nice complication is of course that the
1628  # usual os.path.isdir() etc. methods don't work for an
1629  # rfio filesystem. So we call rfstat and interpret an
1630  # error as meaning that the path does not exist.
1631  self.logger.debug("Checking if path `%s' exists" % \
1632  path)
1633  cmd = "rfstat %s" % path
1634  (status, output) = commands.getstatusoutput(cmd)
1635  if status != 0:
1636  # Path does not exist, let's try and create it.
1637  self.logger.debug("Creating path `%s'" % path)
1638  cmd = "nsmkdir -m 775 %s" % path
1639  (status, output) = commands.getstatusoutput(cmd)
1640  if status != 0:
1641  msg = "Could not create directory `%s'" % path
1642  self.logger.fatal(msg)
1643  raise Error(msg)
1644  cmd = "rfstat %s" % path
1645  (status, output) = commands.getstatusoutput(cmd)
1646  # Now check that it looks like a directory. If I'm not
1647  # mistaken one can deduce this from the fact that the
1648  # (octal) permissions string starts with `40' (instead
1649  # of `100').
1650  permissions = extract_permissions(output)
1651  if not permissions.startswith("40"):
1652  msg = "Path `%s' is not a directory(?)" % path
1653  self.logger.fatal(msg)
1654  raise Error(msg)
1655 
1656  # Figure out the current permissions for this
1657  # (partial) path.
1658  self.logger.debug("Checking permissions for path `%s'" % path)
1659  cmd = "rfstat %s" % path
1660  (status, output) = commands.getstatusoutput(cmd)
1661  if status != 0:
1662  msg = "Could not obtain permissions for directory `%s'" % \
1663  path
1664  self.logger.fatal(msg)
1665  raise Error(msg)
1666  # Take the last three digits of the permissions.
1667  permissions = extract_permissions(output)[-3:]
1668 
1669  # Now if necessary fix permissions.
1670  # NOTE: Be careful never to `downgrade' permissions.
1671  if piece_index == (len_castor_path_pieces - 1):
1672  # This means we're looking at the final
1673  # destination directory.
1674  permissions_target = "775"
1675  else:
1676  # `Only' an intermediate directory.
1677  permissions_target = "775"
1678 
1679  # Compare permissions.
1680  permissions_new = []
1681  for (i, j) in zip(permissions, permissions_target):
1682  permissions_new.append(str(max(int(i), int(j))))
1683  permissions_new = "".join(permissions_new)
1684  self.logger.debug(" current permissions: %s" % \
1685  permissions)
1686  self.logger.debug(" target permissions : %s" % \
1687  permissions_target)
1688  if permissions_new != permissions:
1689  # We have to modify the permissions.
1690  self.logger.debug("Changing permissions of `%s' " \
1691  "to %s (were %s)" % \
1692  (path, permissions_new, permissions))
1693  cmd = "rfchmod %s %s" % (permissions_new, path)
1694  (status, output) = commands.getstatusoutput(cmd)
1695  if status != 0:
1696  msg = "Could not change permissions for path `%s' " \
1697  "to %s" % (path, permissions_new)
1698  self.logger.fatal(msg)
1699  raise Error(msg)
1700 
1701  self.logger.debug(" Permissions ok (%s)" % permissions_new)
1702 
1703  # End of create_and_check_castor_dir.
1704 
1705  ##########
1706 
1707  def pick_a_site(self, sites, cmssw_version):
1708 
1709  # Create list of forbidden sites
1710  sites_forbidden = []
1711 
1712  if (self.preferred_site == "CAF") or (self.preferred_site == "caf.cern.ch"):
1713  self.caf_access = True
1714 
1715  if self.caf_access == False:
1716  sites_forbidden.append("caf.cern.ch")
1717 
1718  # These are the T1 sites. These are only forbidden if we're
1719  # running in non-T1 mode.
1720  # Source:
1721  # https://cmsweb.cern.ch/sitedb/sitelist/?naming_scheme=ce
1722  # Hard-coded, yes. Not nice, no.
1723 
1724  all_t1 = [
1725  "srm-cms.cern.ch",
1726  "ccsrm.in2p3.fr",
1727  "cmssrm-fzk.gridka.de",
1728  "cmssrm.fnal.gov",
1729  "gridka-dCache.fzk.de",
1730  "srm-cms.gridpp.rl.ac.uk",
1731  "srm.grid.sinica.edu.tw",
1732  "srm2.grid.sinica.edu.tw",
1733  "srmcms.pic.es",
1734  "storm-fe-cms.cr.cnaf.infn.it"
1735  ]
1736 
1737  country_codes = {
1738  "CAF" : "caf.cern.ch",
1739  "CH" : "srm-cms.cern.ch",
1740  "FR" : "ccsrm.in2p3.fr",
1741  "DE" : "cmssrm-fzk.gridka.de",
1742  "GOV" : "cmssrm.fnal.gov",
1743  "DE2" : "gridka-dCache.fzk.de",
1744  "UK" : "srm-cms.gridpp.rl.ac.uk",
1745  "TW" : "srm.grid.sinica.edu.tw",
1746  "TW2" : "srm2.grid.sinica.edu.tw",
1747  "ES" : "srmcms.pic.es",
1748  "IT" : "storm-fe-cms.cr.cnaf.infn.it"
1749  }
1750 
1751  if self.non_t1access:
1752  sites_forbidden.extend(all_t1)
1753 
1754  for site in sites_forbidden:
1755  if site in sites:
1756  sites.remove(site)
1757 
1758  if self.preferred_site in country_codes:
1759  self.preferred_site = country_codes[self.preferred_site]
1760 
1761  if self.preferred_site != "no preference":
1762  if self.preferred_site in sites:
1763  sites = [self.preferred_site]
1764  else:
1765  sites= []
1766 
1767  #print sites
1768 
1769  # Looks like we have to do some caching here, otherwise things
1770  # become waaaay toooo sloooooow. So that's what the
1771  # sites_and_versions_cache does.
1772 
1773  # NOTE: Keep this set to None!
1774  site_name = None
1775  cmd = None
1776  while len(sites) > 0 and \
1777  site_name is None:
1778 
1779  # Create list of t1_sites
1780  t1_sites = []
1781  for site in sites:
1782  if site in all_t1:
1783  t1_sites.append(site)
1784  if site == "caf.cern.ch":
1785  t1_sites.append(site)
1786 
1787  # If avilable pick preferred site
1788  #if self.preferred_site in sites:
1789  # se_name = self.preferred_site
1790  # Else, if available pick t1 site
1791 
1792  if len(t1_sites) > 0:
1793  se_name = choice(t1_sites)
1794  # Else pick any site
1795  else:
1796  se_name = choice(sites)
1797 
1798  # But check that it hosts the CMSSW version we want.
1799 
1800  if se_name in self.sites_and_versions_cache and \
1801  cmssw_version in self.sites_and_versions_cache[se_name]:
1802  if self.sites_and_versions_cache[se_name][cmssw_version]:
1803  site_name = se_name
1804  break
1805  else:
1806  self.logger.debug(" --> rejecting site `%s'" % se_name)
1807  sites.remove(se_name)
1808 
1809  else:
1810  self.logger.info("Checking if site `%s' " \
1811  "has CMSSW version `%s'" % \
1812  (se_name, cmssw_version))
1813  self.sites_and_versions_cache[se_name] = {}
1814 
1815  # TODO TODO TODO
1816  # Test for SCRAM architecture removed as per request
1817  # from Andreas.
1818  # scram_arch = os.getenv("SCRAM_ARCH")
1819  # cmd = "lcg-info --list-ce " \
1820  # "--query '" \
1821  # "Tag=VO-cms-%s," \
1822  # "Tag=VO-cms-%s," \
1823  # "CEStatus=Production," \
1824  # "CloseSE=%s'" % \
1825  # (cmssw_version, scram_arch, se_name)
1826  # TODO TODO TODO end
1827 
1828  cmd = "lcg-info --list-ce " \
1829  "--query '" \
1830  "Tag=VO-cms-%s," \
1831  "CEStatus=Production," \
1832  "CloseSE=%s'" % \
1833  (cmssw_version, se_name)
1834  (status, output) = commands.getstatusoutput(cmd)
1835  if status != 0:
1836  self.logger.error("Could not check site information " \
1837  "for site `%s'" % se_name)
1838  else:
1839  if (len(output) > 0) or (se_name == "caf.cern.ch"):
1840  self.sites_and_versions_cache[se_name][cmssw_version] = True
1841  site_name = se_name
1842  break
1843  else:
1844  self.sites_and_versions_cache[se_name][cmssw_version] = False
1845  self.logger.debug(" --> rejecting site `%s'" % se_name)
1846  sites.remove(se_name)
1847 
1848  if site_name is self.no_matching_site_found_str:
1849  self.logger.error(" --> no matching site found")
1850  self.logger.error(" --> Your release or SCRAM " \
1851  "architecture may not be available" \
1852  "anywhere on the (LCG) grid.")
1853  if not cmd is None:
1854  self.logger.debug(" (command used: `%s')" % cmd)
1855  else:
1856  self.logger.debug(" --> selected site `%s'" % site_name)
1857 
1858  # Return something more descriptive (than `None') in case we
1859  # found nothing.
1860  if site_name is None:
1861  site_name = self.no_matching_site_found_str
1862  # Keep track of our global flag signifying that this
1863  # happened.
1864  self.all_sites_found = False
1865 
1866  # End of pick_a_site.
1867  return site_name
1868 
1869  ##########
1870 
1872 
1873  # Set up the command line parser. Note that we fix up the help
1874  # formatter so that we can add some text pointing people to
1875  # the Twiki etc.
1876  parser = optparse.OptionParser(version="%s %s" % \
1877  ("%prog", self.version),
1878  formatter=CMSHarvesterHelpFormatter())
1879 
1880  self.option_parser = parser
1881 
1882  # The debug switch.
1883  parser.add_option("-d", "--debug",
1884  help="Switch on debug mode",
1885  action="callback",
1886  callback=self.option_handler_debug)
1887 
1888  # The quiet switch.
1889  parser.add_option("-q", "--quiet",
1890  help="Be less verbose",
1891  action="callback",
1892  callback=self.option_handler_quiet)
1893 
1894  # The force switch. If this switch is used sanity checks are
1895  # performed but failures do not lead to aborts. Use with care.
1896  parser.add_option("", "--force",
1897  help="Force mode. Do not abort on sanity check "
1898  "failures",
1899  action="callback",
1900  callback=self.option_handler_force)
1901 
1902  # Choose between the different kinds of harvesting.
1903  parser.add_option("", "--harvesting_type",
1904  help="Harvesting type: %s" % \
1905  ", ".join(self.harvesting_types),
1906  action="callback",
1907  callback=self.option_handler_harvesting_type,
1908  type="string",
1909  metavar="HARVESTING_TYPE")
1910 
1911  # Choose between single-step and two-step mode.
1912  parser.add_option("", "--harvesting_mode",
1913  help="Harvesting mode: %s (default = %s)" % \
1914  (", ".join(self.harvesting_modes),
1915  self.harvesting_mode_default),
1916  action="callback",
1917  callback=self.option_handler_harvesting_mode,
1918  type="string",
1919  metavar="HARVESTING_MODE")
1920 
1921  # Override the GlobalTag chosen by the cmsHarvester.
1922  parser.add_option("", "--globaltag",
1923  help="GlobalTag to use. Default is the ones " \
1924  "the dataset was created with for MC, for data" \
1925  "a GlobalTag has to be specified.",
1926  action="callback",
1927  callback=self.option_handler_globaltag,
1928  type="string",
1929  metavar="GLOBALTAG")
1930 
1931  # Allow switching off of reference histograms.
1932  parser.add_option("", "--no-ref-hists",
1933  help="Don't use any reference histograms",
1934  action="callback",
1935  callback=self.option_handler_no_ref_hists)
1936 
1937  # Allow the default (i.e. the one that should be used)
1938  # Frontier connection to be overridden.
1939  parser.add_option("", "--frontier-connection",
1940  help="Use this Frontier connection to find " \
1941  "GlobalTags and LocalTags (for reference " \
1942  "histograms).\nPlease only use this for " \
1943  "testing.",
1944  action="callback",
1945  callback=self.option_handler_frontier_connection,
1946  type="string",
1947  metavar="FRONTIER")
1948 
1949  # Similar to the above but specific to the Frontier connection
1950  # to be used for the GlobalTag.
1951  parser.add_option("", "--frontier-connection-for-globaltag",
1952  help="Use this Frontier connection to find " \
1953  "GlobalTags.\nPlease only use this for " \
1954  "testing.",
1955  action="callback",
1956  callback=self.option_handler_frontier_connection,
1957  type="string",
1958  metavar="FRONTIER")
1959 
1960  # Similar to the above but specific to the Frontier connection
1961  # to be used for the reference histograms.
1962  parser.add_option("", "--frontier-connection-for-refhists",
1963  help="Use this Frontier connection to find " \
1964  "LocalTags (for reference " \
1965  "histograms).\nPlease only use this for " \
1966  "testing.",
1967  action="callback",
1968  callback=self.option_handler_frontier_connection,
1969  type="string",
1970  metavar="FRONTIER")
1971 
1972  # Option to specify the name (or a regexp) of the dataset(s)
1973  # to be used.
1974  parser.add_option("", "--dataset",
1975  help="Name (or regexp) of dataset(s) to process",
1976  action="callback",
1977  #callback=self.option_handler_dataset_name,
1978  callback=self.option_handler_input_spec,
1979  type="string",
1980  #dest="self.input_name",
1981  metavar="DATASET")
1982 
1983  # Option to specify the name (or a regexp) of the dataset(s)
1984  # to be ignored.
1985  parser.add_option("", "--dataset-ignore",
1986  help="Name (or regexp) of dataset(s) to ignore",
1987  action="callback",
1988  callback=self.option_handler_input_spec,
1989  type="string",
1990  metavar="DATASET-IGNORE")
1991 
1992  # Option to specify the name (or a regexp) of the run(s)
1993  # to be used.
1994  parser.add_option("", "--runs",
1995  help="Run number(s) to process",
1996  action="callback",
1997  callback=self.option_handler_input_spec,
1998  type="string",
1999  metavar="RUNS")
2000 
2001  # Option to specify the name (or a regexp) of the run(s)
2002  # to be ignored.
2003  parser.add_option("", "--runs-ignore",
2004  help="Run number(s) to ignore",
2005  action="callback",
2006  callback=self.option_handler_input_spec,
2007  type="string",
2008  metavar="RUNS-IGNORE")
2009 
2010  # Option to specify a file containing a list of dataset names
2011  # (or regexps) to be used.
2012  parser.add_option("", "--datasetfile",
2013  help="File containing list of dataset names " \
2014  "(or regexps) to process",
2015  action="callback",
2016  #callback=self.option_handler_listfile_name,
2017  callback=self.option_handler_input_spec,
2018  type="string",
2019  #dest="self.input_name",
2020  metavar="DATASETFILE")
2021 
2022  # Option to specify a file containing a list of dataset names
2023  # (or regexps) to be ignored.
2024  parser.add_option("", "--datasetfile-ignore",
2025  help="File containing list of dataset names " \
2026  "(or regexps) to ignore",
2027  action="callback",
2028  callback=self.option_handler_input_spec,
2029  type="string",
2030  metavar="DATASETFILE-IGNORE")
2031 
2032  # Option to specify a file containing a list of runs to be
2033  # used.
2034  parser.add_option("", "--runslistfile",
2035  help="File containing list of run numbers " \
2036  "to process",
2037  action="callback",
2038  callback=self.option_handler_input_spec,
2039  type="string",
2040  metavar="RUNSLISTFILE")
2041 
2042  # Option to specify a file containing a list of runs
2043  # to be ignored.
2044  parser.add_option("", "--runslistfile-ignore",
2045  help="File containing list of run numbers " \
2046  "to ignore",
2047  action="callback",
2048  callback=self.option_handler_input_spec,
2049  type="string",
2050  metavar="RUNSLISTFILE-IGNORE")
2051 
2052  # Option to specify a Jsonfile contaning a list of runs
2053  # to be used.
2054  parser.add_option("", "--Jsonrunfile",
2055  help="Jsonfile containing dictionary of run/lumisections pairs. " \
2056  "All lumisections of runs contained in dictionary are processed.",
2057  action="callback",
2058  callback=self.option_handler_input_Jsonrunfile,
2059  type="string",
2060  metavar="JSONRUNFILE")
2061 
2062  # Option to specify a Jsonfile contaning a dictionary of run/lumisections pairs
2063  # to be used.
2064  parser.add_option("", "--Jsonfile",
2065  help="Jsonfile containing dictionary of run/lumisections pairs. " \
2066  "Only specified lumisections of runs contained in dictionary are processed.",
2067  action="callback",
2068  callback=self.option_handler_input_Jsonfile,
2069  type="string",
2070  metavar="JSONFILE")
2071 
2072  # Option to specify a ToDo file contaning a list of runs
2073  # to be used.
2074  parser.add_option("", "--todo-file",
2075  help="Todo file containing a list of runs to process.",
2076  action="callback",
2077  callback=self.option_handler_input_todofile,
2078  type="string",
2079  metavar="TODO-FILE")
2080 
2081  # Option to specify which file to use for the dataset name to
2082  # reference histogram name mappings.
2083  parser.add_option("", "--refhistmappingfile",
2084  help="File to be use for the reference " \
2085  "histogram mappings. Default: `%s'." % \
2086  self.ref_hist_mappings_file_name_default,
2087  action="callback",
2088  callback=self.option_handler_ref_hist_mapping_file,
2089  type="string",
2090  metavar="REFHISTMAPPING-FILE")
2091 
2092  # Specify the place in CASTOR where the output should go.
2093  # NOTE: Only output to CASTOR is supported for the moment,
2094  # since the central DQM results place is on CASTOR anyway.
2095  parser.add_option("", "--castordir",
2096  help="Place on CASTOR to store results. " \
2097  "Default: `%s'." % \
2098  self.castor_base_dir_default,
2099  action="callback",
2100  callback=self.option_handler_castor_dir,
2101  type="string",
2102  metavar="CASTORDIR")
2103 
2104  # Use this to try and create jobs that will run without
2105  # special `t1access' role.
2106  parser.add_option("", "--no-t1access",
2107  help="Try to create jobs that will run " \
2108  "without special `t1access' role",
2109  action="callback",
2110  callback=self.option_handler_no_t1access)
2111 
2112  # Use this to create jobs that may run on CAF
2113  parser.add_option("", "--caf-access",
2114  help="Crab jobs may run " \
2115  "on CAF",
2116  action="callback",
2117  callback=self.option_handler_caf_access)
2118 
2119  # set process.dqmSaver.saveByLumiSection=1 in harvesting cfg file
2120  parser.add_option("", "--saveByLumiSection",
2121  help="set saveByLumiSection=1 in harvesting cfg file",
2122  action="callback",
2123  callback=self.option_handler_saveByLumiSection)
2124 
2125  # Use this to enable automatic creation and submission of crab jobs
2126  parser.add_option("", "--automatic-crab-submission",
2127  help="Crab jobs are created and " \
2128  "submitted automatically",
2129  action="callback",
2130  callback=self.option_handler_crab_submission)
2131 
2132  # Option to set the max number of sites, each
2133  #job is submitted to
2134  parser.add_option("", "--max-sites",
2135  help="Max. number of sites each job is submitted to",
2136  action="callback",
2137  callback=self.option_handler_sites,
2138  type="int")
2139 
2140  # Option to set the preferred site
2141  parser.add_option("", "--site",
2142  help="Crab jobs are submitted to specified site. T1 sites may be shortened by the following (country) codes: \
2143  srm-cms.cern.ch : CH \
2144  ccsrm.in2p3.fr : FR \
2145  cmssrm-fzk.gridka.de : DE \
2146  cmssrm.fnal.gov : GOV \
2147  gridka-dCache.fzk.de : DE2 \
2148  rm-cms.gridpp.rl.ac.uk : UK \
2149  srm.grid.sinica.edu.tw : TW \
2150  srm2.grid.sinica.edu.tw : TW2 \
2151  srmcms.pic.es : ES \
2152  storm-fe-cms.cr.cnaf.infn.it : IT",
2153  action="callback",
2154  callback=self.option_handler_preferred_site,
2155  type="string")
2156 
2157  # This is the command line flag to list all harvesting
2158  # type-to-sequence mappings.
2159  parser.add_option("-l", "--list",
2160  help="List all harvesting types and their" \
2161  "corresponding sequence names",
2162  action="callback",
2163  callback=self.option_handler_list_types)
2164 
2165  # If nothing was specified: tell the user how to do things the
2166  # next time and exit.
2167  # NOTE: We just use the OptParse standard way of doing this by
2168  # acting as if a '--help' was specified.
2169  if len(self.cmd_line_opts) < 1:
2170  self.cmd_line_opts = ["--help"]
2171 
2172  # Some trickery with the options. Why? Well, since these
2173  # options change the output level immediately from the option
2174  # handlers, the results differ depending on where they are on
2175  # the command line. Let's just make sure they are at the
2176  # front.
2177  # NOTE: Not very efficient or sophisticated, but it works and
2178  # it only has to be done once anyway.
2179  for i in ["-d", "--debug",
2180  "-q", "--quiet"]:
2181  if i in self.cmd_line_opts:
2182  self.cmd_line_opts.remove(i)
2183  self.cmd_line_opts.insert(0, i)
2184 
2185  # Everything is set up, now parse what we were given.
2186  parser.set_defaults()
2187  (self.options, self.args) = parser.parse_args(self.cmd_line_opts)
2188 
2189  # End of parse_cmd_line_options.
2190 
2191  ##########
2192 
2194  """Check completeness and correctness of input information.
2195 
2196  Check that all required information has been specified and
2197  that, at least as far as can be easily checked, it makes
2198  sense.
2199 
2200  NOTE: This is also where any default values are applied.
2201 
2202  """
2203 
2204  self.logger.info("Checking completeness/correctness of input...")
2205 
2206  # The cmsHarvester does not take (i.e. understand) any
2207  # arguments so there should not be any.
2208  if len(self.args) > 0:
2209  msg = "Sorry but I don't understand `%s'" % \
2210  (" ".join(self.args))
2211  self.logger.fatal(msg)
2212  raise Usage(msg)
2213 
2214  # BUG BUG BUG
2215  # While we wait for some bugs left and right to get fixed, we
2216  # disable two-step.
2217  if self.harvesting_mode == "two-step":
2218  msg = "--------------------\n" \
2219  " Sorry, but for the moment (well, till it works)" \
2220  " the two-step mode has been disabled.\n" \
2221  "--------------------\n"
2222  self.logger.fatal(msg)
2223  raise Error(msg)
2224  # BUG BUG BUG end
2225 
2226  # We need a harvesting method to be specified
2227  if self.harvesting_type is None:
2228  msg = "Please specify a harvesting type"
2229  self.logger.fatal(msg)
2230  raise Usage(msg)
2231  # as well as a harvesting mode.
2232  if self.harvesting_mode is None:
2233  self.harvesting_mode = self.harvesting_mode_default
2234  msg = "No harvesting mode specified --> using default `%s'" % \
2235  self.harvesting_mode
2236  self.logger.warning(msg)
2237  #raise Usage(msg)
2238 
2239  ###
2240 
2241  # We need an input method so we can find the dataset name(s).
2242  if self.input_method["datasets"]["use"] is None:
2243  msg = "Please specify an input dataset name " \
2244  "or a list file name"
2245  self.logger.fatal(msg)
2246  raise Usage(msg)
2247 
2248  # DEBUG DEBUG DEBUG
2249  # If we get here, we should also have an input name.
2250  assert not self.input_name["datasets"]["use"] is None
2251  # DEBUG DEBUG DEBUG end
2252 
2253  ###
2254 
2255  # The same holds for the reference histogram mapping file (if
2256  # we're using references).
2257  if self.use_ref_hists:
2258  if self.ref_hist_mappings_file_name is None:
2259  self.ref_hist_mappings_file_name = self.ref_hist_mappings_file_name_default
2260  msg = "No reference histogram mapping file specified --> " \
2261  "using default `%s'" % \
2262  self.ref_hist_mappings_file_name
2263  self.logger.warning(msg)
2264 
2265  ###
2266 
2267  # We need to know where to put the stuff (okay, the results)
2268  # on CASTOR.
2269  if self.castor_base_dir is None:
2270  self.castor_base_dir = self.castor_base_dir_default
2271  msg = "No CASTOR area specified -> using default `%s'" % \
2272  self.castor_base_dir
2273  self.logger.warning(msg)
2274  #raise Usage(msg)
2275 
2276  # Only the CERN CASTOR area is supported.
2277  if not self.castor_base_dir.startswith(self.castor_prefix):
2278  msg = "CASTOR area does not start with `%s'" % \
2279  self.castor_prefix
2280  self.logger.fatal(msg)
2281  if self.castor_base_dir.find("castor") > -1 and \
2282  not self.castor_base_dir.find("cern.ch") > -1:
2283  self.logger.fatal("Only CERN CASTOR is supported")
2284  raise Usage(msg)
2285 
2286  ###
2287 
2288  # TODO TODO TODO
2289  # This should be removed in the future, once I find out how to
2290  # get the config file used to create a given dataset from DBS.
2291 
2292  # For data we need to have a GlobalTag. (For MC we can figure
2293  # it out by ourselves.)
2294  if self.globaltag is None:
2295  self.logger.warning("No GlobalTag specified. This means I cannot")
2296  self.logger.warning("run on data, only on MC.")
2297  self.logger.warning("I will skip all data datasets.")
2298 
2299  # TODO TODO TODO end
2300 
2301  # Make sure the GlobalTag ends with `::All'.
2302  if not self.globaltag is None:
2303  if not self.globaltag.endswith("::All"):
2304  self.logger.warning("Specified GlobalTag `%s' does " \
2305  "not end in `::All' --> " \
2306  "appending this missing piece" % \
2307  self.globaltag)
2308  self.globaltag = "%s::All" % self.globaltag
2309 
2310  ###
2311 
2312  # Dump some info about the Frontier connections used.
2313  for (key, value) in six.iteritems(self.frontier_connection_name):
2314  frontier_type_str = "unknown"
2315  if key == "globaltag":
2316  frontier_type_str = "the GlobalTag"
2317  elif key == "refhists":
2318  frontier_type_str = "the reference histograms"
2319  non_str = None
2320  if self.frontier_connection_overridden[key] == True:
2321  non_str = "non-"
2322  else:
2323  non_str = ""
2324  self.logger.info("Using %sdefault Frontier " \
2325  "connection for %s: `%s'" % \
2326  (non_str, frontier_type_str, value))
2327 
2328  ###
2329 
2330  # End of check_input_status.
2331 
2332  ##########
2333 
2334  def check_cmssw(self):
2335  """Check if CMSSW is setup.
2336 
2337  """
2338 
2339  # Try to access the CMSSW_VERSION environment variable. If
2340  # it's something useful we consider CMSSW to be set up
2341  # properly. Otherwise we raise an error.
2342  cmssw_version = os.getenv("CMSSW_VERSION")
2343  if cmssw_version is None:
2344  self.logger.fatal("It seems CMSSW is not setup...")
2345  self.logger.fatal("($CMSSW_VERSION is empty)")
2346  raise Error("ERROR: CMSSW needs to be setup first!")
2347 
2348  self.cmssw_version = cmssw_version
2349  self.logger.info("Found CMSSW version %s properly set up" % \
2350  self.cmssw_version)
2351 
2352  # End of check_cmsssw.
2353  return True
2354 
2355  ##########
2356 
2357  def check_dbs(self):
2358  """Check if DBS is setup.
2359 
2360  """
2361 
2362  # Try to access the DBSCMD_HOME environment variable. If this
2363  # looks useful we consider DBS to be set up
2364  # properly. Otherwise we raise an error.
2365  dbs_home = os.getenv("DBSCMD_HOME")
2366  if dbs_home is None:
2367  self.logger.fatal("It seems DBS is not setup...")
2368  self.logger.fatal(" $DBSCMD_HOME is empty")
2369  raise Error("ERROR: DBS needs to be setup first!")
2370 
2371 ## # Now we try to do a very simple DBS search. If that works
2372 ## # instead of giving us the `Unsupported API call' crap, we
2373 ## # should be good to go.
2374 ## # NOTE: Not ideal, I know, but it reduces the amount of
2375 ## # complaints I get...
2376 ## cmd = "dbs search --query=\"find dataset where dataset = impossible\""
2377 ## (status, output) = commands.getstatusoutput(cmd)
2378 ## pdb.set_trace()
2379 ## if status != 0 or \
2380 ## output.lower().find("unsupported api call") > -1:
2381 ## self.logger.fatal("It seems DBS is not setup...")
2382 ## self.logger.fatal(" %s returns crap:" % cmd)
2383 ## for line in output.split("\n"):
2384 ## self.logger.fatal(" %s" % line)
2385 ## raise Error("ERROR: DBS needs to be setup first!")
2386 
2387  self.logger.debug("Found DBS properly set up")
2388 
2389  # End of check_dbs.
2390  return True
2391 
2392  ##########
2393 
2394  def setup_dbs(self):
2395  """Setup the Python side of DBS.
2396 
2397  For more information see the DBS Python API documentation:
2398  https://twiki.cern.ch/twiki/bin/view/CMS/DBSApiDocumentation
2399 
2400  """
2401 
2402  try:
2403  args={}
2404  args["url"]= "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/" \
2405  "servlet/DBSServlet"
2406  api = DbsApi(args)
2407  self.dbs_api = api
2408 
2409  except DBSAPI.dbsApiException.DbsApiException as ex:
2410  self.logger.fatal("Caught DBS API exception %s: %s " % \
2411  (ex.getClassName(), ex.getErrorMessage()))
2412  if ex.getErrorCode() not in (None, ""):
2413  logger.debug("DBS exception error code: ", ex.getErrorCode())
2414  raise
2415 
2416  # End of setup_dbs.
2417 
2418  ##########
2419 
2420  def dbs_resolve_dataset_name(self, dataset_name):
2421  """Use DBS to resolve a wildcarded dataset name.
2422 
2423  """
2424 
2425  # DEBUG DEBUG DEBUG
2426  # If we get here DBS should have been set up already.
2427  assert not self.dbs_api is None
2428  # DEBUG DEBUG DEBUG end
2429 
2430  # Some minor checking to make sure that whatever we've been
2431  # given as dataset name actually sounds like a dataset name.
2432  if not (dataset_name.startswith("/") and \
2433  dataset_name.endswith("RECO")):
2434  self.logger.warning("Dataset name `%s' does not sound " \
2435  "like a valid dataset name!" % \
2436  dataset_name)
2437 
2438  #----------
2439 
2440  api = self.dbs_api
2441  dbs_query = "find dataset where dataset like %s " \
2442  "and dataset.status = VALID" % \
2443  dataset_name
2444  try:
2445  api_result = api.executeQuery(dbs_query)
2446  except DBSAPI.dbsApiException.DbsApiException:
2447  msg = "ERROR: Could not execute DBS query"
2448  self.logger.fatal(msg)
2449  raise Error(msg)
2450 
2451  # Setup parsing.
2452  handler = DBSXMLHandler(["dataset"])
2453  parser = xml.sax.make_parser()
2454  parser.setContentHandler(handler)
2455 
2456  # Parse.
2457  try:
2458  xml.sax.parseString(api_result, handler)
2459  except SAXParseException:
2460  msg = "ERROR: Could not parse DBS server output"
2461  self.logger.fatal(msg)
2462  raise Error(msg)
2463 
2464  # DEBUG DEBUG DEBUG
2465  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2466  # DEBUG DEBUG DEBUG end
2467 
2468  # Extract the results.
2469  datasets = handler.results.values()[0]
2470 
2471  # End of dbs_resolve_dataset_name.
2472  return datasets
2473 
2474  ##########
2475 
2476  def dbs_resolve_cmssw_version(self, dataset_name):
2477  """Ask DBS for the CMSSW version used to create this dataset.
2478 
2479  """
2480 
2481  # DEBUG DEBUG DEBUG
2482  # If we get here DBS should have been set up already.
2483  assert not self.dbs_api is None
2484  # DEBUG DEBUG DEBUG end
2485 
2486  api = self.dbs_api
2487  dbs_query = "find algo.version where dataset = %s " \
2488  "and dataset.status = VALID" % \
2489  dataset_name
2490  try:
2491  api_result = api.executeQuery(dbs_query)
2492  except DBSAPI.dbsApiException.DbsApiException:
2493  msg = "ERROR: Could not execute DBS query"
2494  self.logger.fatal(msg)
2495  raise Error(msg)
2496 
2497  handler = DBSXMLHandler(["algo.version"])
2498  parser = xml.sax.make_parser()
2499  parser.setContentHandler(handler)
2500 
2501  try:
2502  xml.sax.parseString(api_result, handler)
2503  except SAXParseException:
2504  msg = "ERROR: Could not parse DBS server output"
2505  self.logger.fatal(msg)
2506  raise Error(msg)
2507 
2508  # DEBUG DEBUG DEBUG
2509  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2510  # DEBUG DEBUG DEBUG end
2511 
2512  cmssw_version = handler.results.values()[0]
2513 
2514  # DEBUG DEBUG DEBUG
2515  assert len(cmssw_version) == 1
2516  # DEBUG DEBUG DEBUG end
2517 
2518  cmssw_version = cmssw_version[0]
2519 
2520  # End of dbs_resolve_cmssw_version.
2521  return cmssw_version
2522 
2523  ##########
2524 
2525 ## def dbs_resolve_dataset_number_of_events(self, dataset_name):
2526 ## """Ask DBS across how many events this dataset has been spread
2527 ## out.
2528 
2529 ## This is especially useful to check that we do not submit a job
2530 ## supposed to run on a complete sample that is not contained at
2531 ## a single site.
2532 
2533 ## """
2534 
2535 ## # DEBUG DEBUG DEBUG
2536 ## # If we get here DBS should have been set up already.
2537 ## assert not self.dbs_api is None
2538 ## # DEBUG DEBUG DEBUG end
2539 
2540 ## api = self.dbs_api
2541 ## dbs_query = "find count(site) where dataset = %s " \
2542 ## "and dataset.status = VALID" % \
2543 ## dataset_name
2544 ## try:
2545 ## api_result = api.executeQuery(dbs_query)
2546 ## except DbsApiException:
2547 ## raise Error("ERROR: Could not execute DBS query")
2548 
2549 ## try:
2550 ## num_events = []
2551 ## class Handler(xml.sax.handler.ContentHandler):
2552 ## def startElement(self, name, attrs):
2553 ## if name == "result":
2554 ## num_events.append(str(attrs["COUNT_STORAGEELEMENT"]))
2555 ## xml.sax.parseString(api_result, Handler())
2556 ## except SAXParseException:
2557 ## raise Error("ERROR: Could not parse DBS server output")
2558 
2559 ## # DEBUG DEBUG DEBUG
2560 ## assert len(num_events) == 1
2561 ## # DEBUG DEBUG DEBUG end
2562 
2563 ## num_events = int(num_events[0])
2564 
2565 ## # End of dbs_resolve_dataset_number_of_events.
2566 ## return num_events
2567 
2568  ##########
2569 
2570  def dbs_resolve_runs(self, dataset_name):
2571  """Ask DBS for the list of runs in a given dataset.
2572 
2573  # NOTE: This does not (yet?) skip/remove empty runs. There is
2574  # a bug in the DBS entry run.numevents (i.e. it always returns
2575  # zero) which should be fixed in the `next DBS release'.
2576  # See also:
2577  # https://savannah.cern.ch/bugs/?53452
2578  # https://savannah.cern.ch/bugs/?53711
2579 
2580  """
2581 
2582  # TODO TODO TODO
2583  # We should remove empty runs as soon as the above mentioned
2584  # bug is fixed.
2585  # TODO TODO TODO end
2586 
2587  # DEBUG DEBUG DEBUG
2588  # If we get here DBS should have been set up already.
2589  assert not self.dbs_api is None
2590  # DEBUG DEBUG DEBUG end
2591 
2592  api = self.dbs_api
2593  dbs_query = "find run where dataset = %s " \
2594  "and dataset.status = VALID" % \
2595  dataset_name
2596  try:
2597  api_result = api.executeQuery(dbs_query)
2598  except DBSAPI.dbsApiException.DbsApiException:
2599  msg = "ERROR: Could not execute DBS query"
2600  self.logger.fatal(msg)
2601  raise Error(msg)
2602 
2603  handler = DBSXMLHandler(["run"])
2604  parser = xml.sax.make_parser()
2605  parser.setContentHandler(handler)
2606 
2607  try:
2608  xml.sax.parseString(api_result, handler)
2609  except SAXParseException:
2610  msg = "ERROR: Could not parse DBS server output"
2611  self.logger.fatal(msg)
2612  raise Error(msg)
2613 
2614  # DEBUG DEBUG DEBUG
2615  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2616  # DEBUG DEBUG DEBUG end
2617 
2618  runs = handler.results.values()[0]
2619  # Turn strings into integers.
2620  runs = sorted([int(i) for i in runs])
2621 
2622  # End of dbs_resolve_runs.
2623  return runs
2624 
2625  ##########
2626 
2627  def dbs_resolve_globaltag(self, dataset_name):
2628  """Ask DBS for the globaltag corresponding to a given dataset.
2629 
2630  # BUG BUG BUG
2631  # This does not seem to work for data datasets? E.g. for
2632  # /Cosmics/Commissioning08_CRAFT0831X_V1_311_ReReco_FromSuperPointing_v1/RAW-RECO
2633  # Probaly due to the fact that the GlobalTag changed during
2634  # datataking...
2635  BUG BUG BUG end
2636 
2637  """
2638 
2639  # DEBUG DEBUG DEBUG
2640  # If we get here DBS should have been set up already.
2641  assert not self.dbs_api is None
2642  # DEBUG DEBUG DEBUG end
2643 
2644  api = self.dbs_api
2645  dbs_query = "find dataset.tag where dataset = %s " \
2646  "and dataset.status = VALID" % \
2647  dataset_name
2648  try:
2649  api_result = api.executeQuery(dbs_query)
2650  except DBSAPI.dbsApiException.DbsApiException:
2651  msg = "ERROR: Could not execute DBS query"
2652  self.logger.fatal(msg)
2653  raise Error(msg)
2654 
2655  handler = DBSXMLHandler(["dataset.tag"])
2656  parser = xml.sax.make_parser()
2657  parser.setContentHandler(parser)
2658 
2659  try:
2660  xml.sax.parseString(api_result, handler)
2661  except SAXParseException:
2662  msg = "ERROR: Could not parse DBS server output"
2663  self.logger.fatal(msg)
2664  raise Error(msg)
2665 
2666  # DEBUG DEBUG DEBUG
2667  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2668  # DEBUG DEBUG DEBUG end
2669 
2670  globaltag = handler.results.values()[0]
2671 
2672  # DEBUG DEBUG DEBUG
2673  assert len(globaltag) == 1
2674  # DEBUG DEBUG DEBUG end
2675 
2676  globaltag = globaltag[0]
2677 
2678  # End of dbs_resolve_globaltag.
2679  return globaltag
2680 
2681  ##########
2682 
2683  def dbs_resolve_datatype(self, dataset_name):
2684  """Ask DBS for the the data type (data or mc) of a given
2685  dataset.
2686 
2687  """
2688 
2689  # DEBUG DEBUG DEBUG
2690  # If we get here DBS should have been set up already.
2691  assert not self.dbs_api is None
2692  # DEBUG DEBUG DEBUG end
2693 
2694  api = self.dbs_api
2695  dbs_query = "find datatype.type where dataset = %s " \
2696  "and dataset.status = VALID" % \
2697  dataset_name
2698  try:
2699  api_result = api.executeQuery(dbs_query)
2700  except DBSAPI.dbsApiException.DbsApiException:
2701  msg = "ERROR: Could not execute DBS query"
2702  self.logger.fatal(msg)
2703  raise Error(msg)
2704 
2705  handler = DBSXMLHandler(["datatype.type"])
2706  parser = xml.sax.make_parser()
2707  parser.setContentHandler(handler)
2708 
2709  try:
2710  xml.sax.parseString(api_result, handler)
2711  except SAXParseException:
2712  msg = "ERROR: Could not parse DBS server output"
2713  self.logger.fatal(msg)
2714  raise Error(msg)
2715 
2716  # DEBUG DEBUG DEBUG
2717  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2718  # DEBUG DEBUG DEBUG end
2719 
2720  datatype = handler.results.values()[0]
2721 
2722  # DEBUG DEBUG DEBUG
2723  assert len(datatype) == 1
2724  # DEBUG DEBUG DEBUG end
2725 
2726  datatype = datatype[0]
2727 
2728  # End of dbs_resolve_datatype.
2729  return datatype
2730 
2731  ##########
2732 
2733  # OBSOLETE OBSOLETE OBSOLETE
2734  # This method is no longer used.
2735 
2736  def dbs_resolve_number_of_events(self, dataset_name, run_number=None):
2737  """Determine the number of events in a given dataset (and run).
2738 
2739  Ask DBS for the number of events in a dataset. If a run number
2740  is specified the number of events returned is that in that run
2741  of that dataset. If problems occur we throw an exception.
2742 
2743  # BUG BUG BUG
2744  # Since DBS does not return the number of events correctly,
2745  # neither for runs nor for whole datasets, we have to work
2746  # around that a bit...
2747  # BUG BUG BUG end
2748 
2749  """
2750 
2751  # DEBUG DEBUG DEBUG
2752  # If we get here DBS should have been set up already.
2753  assert not self.dbs_api is None
2754  # DEBUG DEBUG DEBUG end
2755 
2756  api = self.dbs_api
2757  dbs_query = "find file.name, file.numevents where dataset = %s " \
2758  "and dataset.status = VALID" % \
2759  dataset_name
2760  if not run_number is None:
2761  dbs_query = dbq_query + (" and run = %d" % run_number)
2762  try:
2763  api_result = api.executeQuery(dbs_query)
2764  except DBSAPI.dbsApiException.DbsApiException:
2765  msg = "ERROR: Could not execute DBS query"
2766  self.logger.fatal(msg)
2767  raise Error(msg)
2768 
2769  handler = DBSXMLHandler(["file.name", "file.numevents"])
2770  parser = xml.sax.make_parser()
2771  parser.setContentHandler(handler)
2772 
2773  try:
2774  xml.sax.parseString(api_result, handler)
2775  except SAXParseException:
2776  msg = "ERROR: Could not parse DBS server output"
2777  self.logger.fatal(msg)
2778  raise Error(msg)
2779 
2780  # DEBUG DEBUG DEBUG
2781  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2782  # DEBUG DEBUG DEBUG end
2783 
2784  num_events = sum(handler.results["file.numevents"])
2785 
2786  # End of dbs_resolve_number_of_events.
2787  return num_events
2788 
2789  # OBSOLETE OBSOLETE OBSOLETE end
2790 
2791  ##########
2792 
2793 ## def dbs_resolve_dataset_number_of_sites(self, dataset_name):
2794 ## """Ask DBS across how many sites this dataset has been spread
2795 ## out.
2796 
2797 ## This is especially useful to check that we do not submit a job
2798 ## supposed to run on a complete sample that is not contained at
2799 ## a single site.
2800 
2801 ## """
2802 
2803 ## # DEBUG DEBUG DEBUG
2804 ## # If we get here DBS should have been set up already.
2805 ## assert not self.dbs_api is None
2806 ## # DEBUG DEBUG DEBUG end
2807 
2808 ## api = self.dbs_api
2809 ## dbs_query = "find count(site) where dataset = %s " \
2810 ## "and dataset.status = VALID" % \
2811 ## dataset_name
2812 ## try:
2813 ## api_result = api.executeQuery(dbs_query)
2814 ## except DbsApiException:
2815 ## raise Error("ERROR: Could not execute DBS query")
2816 
2817 ## try:
2818 ## num_sites = []
2819 ## class Handler(xml.sax.handler.ContentHandler):
2820 ## def startElement(self, name, attrs):
2821 ## if name == "result":
2822 ## num_sites.append(str(attrs["COUNT_STORAGEELEMENT"]))
2823 ## xml.sax.parseString(api_result, Handler())
2824 ## except SAXParseException:
2825 ## raise Error("ERROR: Could not parse DBS server output")
2826 
2827 ## # DEBUG DEBUG DEBUG
2828 ## assert len(num_sites) == 1
2829 ## # DEBUG DEBUG DEBUG end
2830 
2831 ## num_sites = int(num_sites[0])
2832 
2833 ## # End of dbs_resolve_dataset_number_of_sites.
2834 ## return num_sites
2835 
2836  ##########
2837 
2838 ## def dbs_check_dataset_spread(self, dataset_name):
2839 ## """Figure out across how many sites this dataset is spread.
2840 
2841 ## NOTE: This is something we need to figure out per run, since
2842 ## we want to submit harvesting jobs per run.
2843 
2844 ## Basically three things can happen with a given dataset:
2845 ## - the whole dataset is available on a single site,
2846 ## - the whole dataset is available (mirrored) at multiple sites,
2847 ## - the dataset is spread across multiple sites and there is no
2848 ## single site containing the full dataset in one place.
2849 
2850 ## NOTE: If all goes well, it should not be possible that
2851 ## anything but a _full_ dataset is mirrored. So we ignore the
2852 ## possibility in which for example one site contains the full
2853 ## dataset and two others mirror half of it.
2854 ## ANOTHER NOTE: According to some people this last case _could_
2855 ## actually happen. I will not design for it, but make sure it
2856 ## ends up as a false negative, in which case we just loose some
2857 ## efficiency and treat the dataset (unnecessarily) as
2858 ## spread-out.
2859 
2860 ## We don't really care about the first two possibilities, but in
2861 ## the third case we need to make sure to run the harvesting in
2862 ## two-step mode.
2863 
2864 ## This method checks with DBS which of the above cases is true
2865 ## for the dataset name given, and returns a 1 for the first two
2866 ## cases, and the number of sites across which the dataset is
2867 ## spread for the third case.
2868 
2869 ## The way in which this is done is by asking how many files each
2870 ## site has for the dataset. In the first case there is only one
2871 ## site, in the second case all sites should have the same number
2872 ## of files (i.e. the total number of files in the dataset) and
2873 ## in the third case the file counts from all sites should add up
2874 ## to the total file count for the dataset.
2875 
2876 ## """
2877 
2878 ## # DEBUG DEBUG DEBUG
2879 ## # If we get here DBS should have been set up already.
2880 ## assert not self.dbs_api is None
2881 ## # DEBUG DEBUG DEBUG end
2882 
2883 ## api = self.dbs_api
2884 ## dbs_query = "find run, run.numevents, site, file.count " \
2885 ## "where dataset = %s " \
2886 ## "and dataset.status = VALID" % \
2887 ## dataset_name
2888 ## try:
2889 ## api_result = api.executeQuery(dbs_query)
2890 ## except DbsApiException:
2891 ## msg = "ERROR: Could not execute DBS query"
2892 ## self.logger.fatal(msg)
2893 ## raise Error(msg)
2894 
2895 ## # Index things by run number. No cross-check is done to make
2896 ## # sure we get results for each and every run in the
2897 ## # dataset. I'm not sure this would make sense since we'd be
2898 ## # cross-checking DBS info with DBS info anyway. Note that we
2899 ## # use the file count per site to see if we're dealing with an
2900 ## # incomplete vs. a mirrored dataset.
2901 ## sample_info = {}
2902 ## try:
2903 ## class Handler(xml.sax.handler.ContentHandler):
2904 ## def startElement(self, name, attrs):
2905 ## if name == "result":
2906 ## run_number = int(attrs["RUNS_RUNNUMBER"])
2907 ## site_name = str(attrs["STORAGEELEMENT_SENAME"])
2908 ## file_count = int(attrs["COUNT_FILES"])
2909 ## # BUG BUG BUG
2910 ## # Doh! For some reason DBS never returns any other
2911 ## # event count than zero.
2912 ## event_count = int(attrs["RUNS_NUMBEROFEVENTS"])
2913 ## # BUG BUG BUG end
2914 ## info = (site_name, file_count, event_count)
2915 ## try:
2916 ## sample_info[run_number].append(info)
2917 ## except KeyError:
2918 ## sample_info[run_number] = [info]
2919 ## xml.sax.parseString(api_result, Handler())
2920 ## except SAXParseException:
2921 ## msg = "ERROR: Could not parse DBS server output"
2922 ## self.logger.fatal(msg)
2923 ## raise Error(msg)
2924 
2925 ## # Now translate this into a slightly more usable mapping.
2926 ## sites = {}
2927 ## for (run_number, site_info) in six.iteritems(sample_info):
2928 ## # Quick-n-dirty trick to see if all file counts are the
2929 ## # same.
2930 ## unique_file_counts = set([i[1] for i in site_info])
2931 ## if len(unique_file_counts) == 1:
2932 ## # Okay, so this must be a mirrored dataset.
2933 ## # We have to pick one but we have to be careful. We
2934 ## # cannot submit to things like a T0, a T1, or CAF.
2935 ## site_names = [self.pick_a_site([i[0] for i in site_info])]
2936 ## nevents = [site_info[0][2]]
2937 ## else:
2938 ## # Looks like this is a spread-out sample.
2939 ## site_names = [i[0] for i in site_info]
2940 ## nevents = [i[2] for i in site_info]
2941 ## sites[run_number] = zip(site_names, nevents)
2942 
2943 ## self.logger.debug("Sample `%s' spread is:" % dataset_name)
2944 ## run_numbers = sites.keys()
2945 ## run_numbers.sort()
2946 ## for run_number in run_numbers:
2947 ## self.logger.debug(" run # %6d: %d sites (%s)" % \
2948 ## (run_number,
2949 ## len(sites[run_number]),
2950 ## ", ".join([i[0] for i in sites[run_number]])))
2951 
2952 ## # End of dbs_check_dataset_spread.
2953 ## return sites
2954 
2955 ## # DEBUG DEBUG DEBUG
2956 ## # Just kept for debugging now.
2957 ## def dbs_check_dataset_spread_old(self, dataset_name):
2958 ## """Figure out across how many sites this dataset is spread.
2959 
2960 ## NOTE: This is something we need to figure out per run, since
2961 ## we want to submit harvesting jobs per run.
2962 
2963 ## Basically three things can happen with a given dataset:
2964 ## - the whole dataset is available on a single site,
2965 ## - the whole dataset is available (mirrored) at multiple sites,
2966 ## - the dataset is spread across multiple sites and there is no
2967 ## single site containing the full dataset in one place.
2968 
2969 ## NOTE: If all goes well, it should not be possible that
2970 ## anything but a _full_ dataset is mirrored. So we ignore the
2971 ## possibility in which for example one site contains the full
2972 ## dataset and two others mirror half of it.
2973 ## ANOTHER NOTE: According to some people this last case _could_
2974 ## actually happen. I will not design for it, but make sure it
2975 ## ends up as a false negative, in which case we just loose some
2976 ## efficiency and treat the dataset (unnecessarily) as
2977 ## spread-out.
2978 
2979 ## We don't really care about the first two possibilities, but in
2980 ## the third case we need to make sure to run the harvesting in
2981 ## two-step mode.
2982 
2983 ## This method checks with DBS which of the above cases is true
2984 ## for the dataset name given, and returns a 1 for the first two
2985 ## cases, and the number of sites across which the dataset is
2986 ## spread for the third case.
2987 
2988 ## The way in which this is done is by asking how many files each
2989 ## site has for the dataset. In the first case there is only one
2990 ## site, in the second case all sites should have the same number
2991 ## of files (i.e. the total number of files in the dataset) and
2992 ## in the third case the file counts from all sites should add up
2993 ## to the total file count for the dataset.
2994 
2995 ## """
2996 
2997 ## # DEBUG DEBUG DEBUG
2998 ## # If we get here DBS should have been set up already.
2999 ## assert not self.dbs_api is None
3000 ## # DEBUG DEBUG DEBUG end
3001 
3002 ## api = self.dbs_api
3003 ## dbs_query = "find run, run.numevents, site, file.count " \
3004 ## "where dataset = %s " \
3005 ## "and dataset.status = VALID" % \
3006 ## dataset_name
3007 ## try:
3008 ## api_result = api.executeQuery(dbs_query)
3009 ## except DbsApiException:
3010 ## msg = "ERROR: Could not execute DBS query"
3011 ## self.logger.fatal(msg)
3012 ## raise Error(msg)
3013 
3014 ## # Index things by run number. No cross-check is done to make
3015 ## # sure we get results for each and every run in the
3016 ## # dataset. I'm not sure this would make sense since we'd be
3017 ## # cross-checking DBS info with DBS info anyway. Note that we
3018 ## # use the file count per site to see if we're dealing with an
3019 ## # incomplete vs. a mirrored dataset.
3020 ## sample_info = {}
3021 ## try:
3022 ## class Handler(xml.sax.handler.ContentHandler):
3023 ## def startElement(self, name, attrs):
3024 ## if name == "result":
3025 ## run_number = int(attrs["RUNS_RUNNUMBER"])
3026 ## site_name = str(attrs["STORAGEELEMENT_SENAME"])
3027 ## file_count = int(attrs["COUNT_FILES"])
3028 ## # BUG BUG BUG
3029 ## # Doh! For some reason DBS never returns any other
3030 ## # event count than zero.
3031 ## event_count = int(attrs["RUNS_NUMBEROFEVENTS"])
3032 ## # BUG BUG BUG end
3033 ## info = (site_name, file_count, event_count)
3034 ## try:
3035 ## sample_info[run_number].append(info)
3036 ## except KeyError:
3037 ## sample_info[run_number] = [info]
3038 ## xml.sax.parseString(api_result, Handler())
3039 ## except SAXParseException:
3040 ## msg = "ERROR: Could not parse DBS server output"
3041 ## self.logger.fatal(msg)
3042 ## raise Error(msg)
3043 
3044 ## # Now translate this into a slightly more usable mapping.
3045 ## sites = {}
3046 ## for (run_number, site_info) in six.iteritems(sample_info):
3047 ## # Quick-n-dirty trick to see if all file counts are the
3048 ## # same.
3049 ## unique_file_counts = set([i[1] for i in site_info])
3050 ## if len(unique_file_counts) == 1:
3051 ## # Okay, so this must be a mirrored dataset.
3052 ## # We have to pick one but we have to be careful. We
3053 ## # cannot submit to things like a T0, a T1, or CAF.
3054 ## site_names = [self.pick_a_site([i[0] for i in site_info])]
3055 ## nevents = [site_info[0][2]]
3056 ## else:
3057 ## # Looks like this is a spread-out sample.
3058 ## site_names = [i[0] for i in site_info]
3059 ## nevents = [i[2] for i in site_info]
3060 ## sites[run_number] = zip(site_names, nevents)
3061 
3062 ## self.logger.debug("Sample `%s' spread is:" % dataset_name)
3063 ## run_numbers = sites.keys()
3064 ## run_numbers.sort()
3065 ## for run_number in run_numbers:
3066 ## self.logger.debug(" run # %6d: %d site(s) (%s)" % \
3067 ## (run_number,
3068 ## len(sites[run_number]),
3069 ## ", ".join([i[0] for i in sites[run_number]])))
3070 
3071 ## # End of dbs_check_dataset_spread_old.
3072 ## return sites
3073 ## # DEBUG DEBUG DEBUG end
3074 
3075  ##########
3076 
3077  def dbs_check_dataset_spread(self, dataset_name):
3078  """Figure out the number of events in each run of this dataset.
3079 
3080  This is a more efficient way of doing this than calling
3081  dbs_resolve_number_of_events for each run.
3082 
3083  """
3084 
3085  self.logger.debug("Checking spread of dataset `%s'" % dataset_name)
3086 
3087  # DEBUG DEBUG DEBUG
3088  # If we get here DBS should have been set up already.
3089  assert not self.dbs_api is None
3090  # DEBUG DEBUG DEBUG end
3091 
3092  api = self.dbs_api
3093  dbs_query = "find run.number, site, file.name, file.numevents " \
3094  "where dataset = %s " \
3095  "and dataset.status = VALID" % \
3096  dataset_name
3097  try:
3098  api_result = api.executeQuery(dbs_query)
3099  except DBSAPI.dbsApiException.DbsApiException:
3100  msg = "ERROR: Could not execute DBS query"
3101  self.logger.fatal(msg)
3102  raise Error(msg)
3103 
3104  handler = DBSXMLHandler(["run.number", "site", "file.name", "file.numevents"])
3105  parser = xml.sax.make_parser()
3106  parser.setContentHandler(handler)
3107 
3108  try:
3109  # OBSOLETE OBSOLETE OBSOLETE
3110 ## class Handler(xml.sax.handler.ContentHandler):
3111 ## def startElement(self, name, attrs):
3112 ## if name == "result":
3113 ## site_name = str(attrs["STORAGEELEMENT_SENAME"])
3114 ## # TODO TODO TODO
3115 ## # Ugly hack to get around cases like this:
3116 ## # $ dbs search --query="find dataset, site, file.count where dataset=/RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO"
3117 ## # Using DBS instance at: http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet
3118 ## # Processing ... \
3119 ## # PATH STORAGEELEMENT_SENAME COUNT_FILES
3120 ## # _________________________________________________________________________________
3121 ## # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO 1
3122 ## # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO cmssrm.fnal.gov 12
3123 ## # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO srm-cms.cern.ch 12
3124 ## if len(site_name) < 1:
3125 ## return
3126 ## # TODO TODO TODO end
3127 ## run_number = int(attrs["RUNS_RUNNUMBER"])
3128 ## file_name = str(attrs["FILES_LOGICALFILENAME"])
3129 ## nevents = int(attrs["FILES_NUMBEROFEVENTS"])
3130 ## # I know, this is a bit of a kludge.
3131 ## if not files_info.has_key(run_number):
3132 ## # New run.
3133 ## files_info[run_number] = {}
3134 ## files_info[run_number][file_name] = (nevents,
3135 ## [site_name])
3136 ## elif not files_info[run_number].has_key(file_name):
3137 ## # New file for a known run.
3138 ## files_info[run_number][file_name] = (nevents,
3139 ## [site_name])
3140 ## else:
3141 ## # New entry for a known file for a known run.
3142 ## # DEBUG DEBUG DEBUG
3143 ## # Each file should have the same number of
3144 ## # events independent of the site it's at.
3145 ## assert nevents == files_info[run_number][file_name][0]
3146 ## # DEBUG DEBUG DEBUG end
3147 ## files_info[run_number][file_name][1].append(site_name)
3148  # OBSOLETE OBSOLETE OBSOLETE end
3149  xml.sax.parseString(api_result, handler)
3150  except SAXParseException:
3151  msg = "ERROR: Could not parse DBS server output"
3152  self.logger.fatal(msg)
3153  raise Error(msg)
3154 
3155  # DEBUG DEBUG DEBUG
3156  assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
3157  # DEBUG DEBUG DEBUG end
3158 
3159  # Now reshuffle all results a bit so we can more easily use
3160  # them later on. (Remember that all arrays in the results
3161  # should have equal length.)
3162  files_info = {}
3163  for (index, site_name) in enumerate(handler.results["site"]):
3164  # Ugly hack to get around cases like this:
3165  # $ dbs search --query="find dataset, site, file.count where dataset=/RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO"
3166  # Using DBS instance at: http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet
3167  # Processing ... \
3168  # PATH STORAGEELEMENT_SENAME COUNT_FILES
3169  # _________________________________________________________________________________
3170  # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO 1
3171  # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO cmssrm.fnal.gov 12
3172  # /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO srm-cms.cern.ch 12
3173  if len(site_name) < 1:
3174  continue
3175  run_number = int(handler.results["run.number"][index])
3176  file_name = handler.results["file.name"][index]
3177  nevents = int(handler.results["file.numevents"][index])
3178 
3179  # I know, this is a bit of a kludge.
3180  if run_number not in files_info:
3181  # New run.
3182  files_info[run_number] = {}
3183  files_info[run_number][file_name] = (nevents,
3184  [site_name])
3185  elif file_name not in files_info[run_number]:
3186  # New file for a known run.
3187  files_info[run_number][file_name] = (nevents,
3188  [site_name])
3189  else:
3190  # New entry for a known file for a known run.
3191  # DEBUG DEBUG DEBUG
3192  # Each file should have the same number of
3193  # events independent of the site it's at.
3194  assert nevents == files_info[run_number][file_name][0]
3195  # DEBUG DEBUG DEBUG end
3196  files_info[run_number][file_name][1].append(site_name)
3197 
3198  # Remove any information for files that are not available
3199  # anywhere. NOTE: After introducing the ugly hack above, this
3200  # is a bit redundant, but let's keep it for the moment.
3201  for run_number in files_info.keys():
3202  files_without_sites = [i for (i, j) in \
3203  files_info[run_number].items() \
3204  if len(j[1]) < 1]
3205  if len(files_without_sites) > 0:
3206  self.logger.warning("Removing %d file(s)" \
3207  " with empty site names" % \
3208  len(files_without_sites))
3209  for file_name in files_without_sites:
3210  del files_info[run_number][file_name]
3211  # files_info[run_number][file_name] = (files_info \
3212  # [run_number] \
3213  # [file_name][0], [])
3214 
3215  # And another bit of a kludge.
3216  num_events_catalog = {}
3217  for run_number in files_info.keys():
3218  site_names = list(set([j for i in files_info[run_number].values() for j in i[1]]))
3219 
3220  # NOTE: The term `mirrored' does not have the usual
3221  # meaning here. It basically means that we can apply
3222  # single-step harvesting.
3223  mirrored = None
3224  if len(site_names) > 1:
3225  # Now we somehow need to figure out if we're dealing
3226  # with a mirrored or a spread-out dataset. The rule we
3227  # use here is that we're dealing with a spread-out
3228  # dataset unless we can find at least one site
3229  # containing exactly the full list of files for this
3230  # dataset that DBS knows about. In that case we just
3231  # use only that site.
3232  all_file_names = files_info[run_number].keys()
3233  all_file_names = set(all_file_names)
3234  sites_with_complete_copies = []
3235  for site_name in site_names:
3236  files_at_site = [i for (i, (j, k)) \
3237  in files_info[run_number].items() \
3238  if site_name in k]
3239  files_at_site = set(files_at_site)
3240  if files_at_site == all_file_names:
3241  sites_with_complete_copies.append(site_name)
3242  if len(sites_with_complete_copies) < 1:
3243  # This dataset/run is available at more than one
3244  # site, but no one has a complete copy. So this is
3245  # a spread-out sample.
3246  mirrored = False
3247  else:
3248  if len(sites_with_complete_copies) > 1:
3249  # This sample is available (and complete) at
3250  # more than one site. Definitely mirrored.
3251  mirrored = True
3252  else:
3253  # This dataset/run is available at more than
3254  # one site and at least one of them has a
3255  # complete copy. Even if this is only a single
3256  # site, let's call this `mirrored' and run the
3257  # single-step harvesting.
3258  mirrored = True
3259 
3260 ## site_names_ref = set(files_info[run_number].values()[0][1])
3261 ## for site_names_tmp in files_info[run_number].values()[1:]:
3262 ## if set(site_names_tmp[1]) != site_names_ref:
3263 ## mirrored = False
3264 ## break
3265 
3266  if mirrored:
3267  self.logger.debug(" -> run appears to be `mirrored'")
3268  else:
3269  self.logger.debug(" -> run appears to be spread-out")
3270 
3271  if mirrored and \
3272  len(sites_with_complete_copies) != len(site_names):
3273  # Remove any references to incomplete sites if we
3274  # have at least one complete site (and if there
3275  # are incomplete sites).
3276  for (file_name, (i, sites)) in files_info[run_number].items():
3277  complete_sites = [site for site in sites \
3278  if site in sites_with_complete_copies]
3279  files_info[run_number][file_name] = (i, complete_sites)
3280  site_names = sites_with_complete_copies
3281 
3282  self.logger.debug(" for run #%d:" % run_number)
3283  num_events_catalog[run_number] = {}
3284  num_events_catalog[run_number]["all_sites"] = sum([i[0] for i in files_info[run_number].values()])
3285  if len(site_names) < 1:
3286  self.logger.debug(" run is not available at any site")
3287  self.logger.debug(" (but should contain %d events" % \
3288  num_events_catalog[run_number]["all_sites"])
3289  else:
3290  self.logger.debug(" at all sites combined there are %d events" % \
3291  num_events_catalog[run_number]["all_sites"])
3292  for site_name in site_names:
3293  num_events_catalog[run_number][site_name] = sum([i[0] for i in files_info[run_number].values() if site_name in i[1]])
3294  self.logger.debug(" at site `%s' there are %d events" % \
3295  (site_name, num_events_catalog[run_number][site_name]))
3296  num_events_catalog[run_number]["mirrored"] = mirrored
3297 
3298  # End of dbs_check_dataset_spread.
3299  return num_events_catalog
3300 
3301  # Beginning of old version.
3302 ## def dbs_check_dataset_num_events(self, dataset_name):
3303 ## """Figure out the number of events in each run of this dataset.
3304 
3305 ## This is a more efficient way of doing this than calling
3306 ## dbs_resolve_number_of_events for each run.
3307 
3308 ## # BUG BUG BUG
3309 ## # This might very well not work at all for spread-out samples. (?)
3310 ## # BUG BUG BUG end
3311 
3312 ## """
3313 
3314 ## # DEBUG DEBUG DEBUG
3315 ## # If we get here DBS should have been set up already.
3316 ## assert not self.dbs_api is None
3317 ## # DEBUG DEBUG DEBUG end
3318 
3319 ## api = self.dbs_api
3320 ## dbs_query = "find run.number, file.name, file.numevents where dataset = %s " \
3321 ## "and dataset.status = VALID" % \
3322 ## dataset_name
3323 ## try:
3324 ## api_result = api.executeQuery(dbs_query)
3325 ## except DbsApiException:
3326 ## msg = "ERROR: Could not execute DBS query"
3327 ## self.logger.fatal(msg)
3328 ## raise Error(msg)
3329 
3330 ## try:
3331 ## files_info = {}
3332 ## class Handler(xml.sax.handler.ContentHandler):
3333 ## def startElement(self, name, attrs):
3334 ## if name == "result":
3335 ## run_number = int(attrs["RUNS_RUNNUMBER"])
3336 ## file_name = str(attrs["FILES_LOGICALFILENAME"])
3337 ## nevents = int(attrs["FILES_NUMBEROFEVENTS"])
3338 ## try:
3339 ## files_info[run_number][file_name] = nevents
3340 ## except KeyError:
3341 ## files_info[run_number] = {file_name: nevents}
3342 ## xml.sax.parseString(api_result, Handler())
3343 ## except SAXParseException:
3344 ## msg = "ERROR: Could not parse DBS server output"
3345 ## self.logger.fatal(msg)
3346 ## raise Error(msg)
3347 
3348 ## num_events_catalog = {}
3349 ## for run_number in files_info.keys():
3350 ## num_events_catalog[run_number] = sum(files_info[run_number].values())
3351 
3352 ## # End of dbs_check_dataset_num_events.
3353 ## return num_events_catalog
3354  # End of old version.
3355 
3356  ##########
3357 
3358  def build_dataset_list(self, input_method, input_name):
3359  """Build a list of all datasets to be processed.
3360 
3361  """
3362 
3363  dataset_names = []
3364 
3365  # It may be, but only for the list of datasets to ignore, that
3366  # the input method and name are None because nothing was
3367  # specified. In that case just an empty list is returned.
3368  if input_method is None:
3369  pass
3370  elif input_method == "dataset":
3371  # Input comes from a dataset name directly on the command
3372  # line. But, this can also contain wildcards so we need
3373  # DBS to translate it conclusively into a list of explicit
3374  # dataset names.
3375  self.logger.info("Asking DBS for dataset names")
3376  dataset_names = self.dbs_resolve_dataset_name(input_name)
3377  elif input_method == "datasetfile":
3378  # In this case a file containing a list of dataset names
3379  # is specified. Still, each line may contain wildcards so
3380  # this step also needs help from DBS.
3381  # NOTE: Lines starting with a `#' are ignored.
3382  self.logger.info("Reading input from list file `%s'" % \
3383  input_name)
3384  try:
3385  listfile = open("/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/bin/%s" %input_name, "r")
3386  print("open listfile")
3387  for dataset in listfile:
3388  # Skip empty lines.
3389  dataset_stripped = dataset.strip()
3390  if len(dataset_stripped) < 1:
3391  continue
3392  # Skip lines starting with a `#'.
3393  if dataset_stripped[0] != "#":
3394  dataset_names.extend(self. \
3395  dbs_resolve_dataset_name(dataset_stripped))
3396  listfile.close()
3397  except IOError:
3398  msg = "ERROR: Could not open input list file `%s'" % \
3399  input_name
3400  self.logger.fatal(msg)
3401  raise Error(msg)
3402  else:
3403  # DEBUG DEBUG DEBUG
3404  # We should never get here.
3405  assert False, "Unknown input method `%s'" % input_method
3406  # DEBUG DEBUG DEBUG end
3407 
3408  # Remove duplicates from the dataset list.
3409  # NOTE: There should not be any duplicates in any list coming
3410  # from DBS, but maybe the user provided a list file with less
3411  # care.
3412  # Store for later use.
3413  dataset_names = sorted(set(dataset_names))
3414 
3415 
3416  # End of build_dataset_list.
3417  return dataset_names
3418 
3419  ##########
3420 
3422  """Build a list of datasets to process.
3423 
3424  """
3425 
3426  self.logger.info("Building list of datasets to consider...")
3427 
3428  input_method = self.input_method["datasets"]["use"]
3429  input_name = self.input_name["datasets"]["use"]
3430  dataset_names = self.build_dataset_list(input_method,
3431  input_name)
3432  self.datasets_to_use = dict(list(zip(dataset_names,
3433  [None] * len(dataset_names))))
3434 
3435  self.logger.info(" found %d dataset(s) to process:" % \
3436  len(dataset_names))
3437  for dataset in dataset_names:
3438  self.logger.info(" `%s'" % dataset)
3439 
3440  # End of build_dataset_use_list.
3441 
3442  ##########
3443 
3445  """Build a list of datasets to ignore.
3446 
3447  NOTE: We should always have a list of datasets to process, but
3448  it may be that we don't have a list of datasets to ignore.
3449 
3450  """
3451 
3452  self.logger.info("Building list of datasets to ignore...")
3453 
3454  input_method = self.input_method["datasets"]["ignore"]
3455  input_name = self.input_name["datasets"]["ignore"]
3456  dataset_names = self.build_dataset_list(input_method,
3457  input_name)
3458  self.datasets_to_ignore = dict(list(zip(dataset_names,
3459  [None] * len(dataset_names))))
3460 
3461  self.logger.info(" found %d dataset(s) to ignore:" % \
3462  len(dataset_names))
3463  for dataset in dataset_names:
3464  self.logger.info(" `%s'" % dataset)
3465 
3466  # End of build_dataset_ignore_list.
3467 
3468  ##########
3469 
3470  def build_runs_list(self, input_method, input_name):
3471 
3472  runs = []
3473 
3474  # A list of runs (either to use or to ignore) is not
3475  # required. This protects against `empty cases.'
3476  if input_method is None:
3477  pass
3478  elif input_method == "runs":
3479  # A list of runs was specified directly from the command
3480  # line.
3481  self.logger.info("Reading list of runs from the " \
3482  "command line")
3483  runs.extend([int(i.strip()) \
3484  for i in input_name.split(",") \
3485  if len(i.strip()) > 0])
3486  elif input_method == "runslistfile":
3487  # We were passed a file containing a list of runs.
3488  self.logger.info("Reading list of runs from file `%s'" % \
3489  input_name)
3490  try:
3491  listfile = open(input_name, "r")
3492  for run in listfile:
3493  # Skip empty lines.
3494  run_stripped = run.strip()
3495  if len(run_stripped) < 1:
3496  continue
3497  # Skip lines starting with a `#'.
3498  if run_stripped[0] != "#":
3499  runs.append(int(run_stripped))
3500  listfile.close()
3501  except IOError:
3502  msg = "ERROR: Could not open input list file `%s'" % \
3503  input_name
3504  self.logger.fatal(msg)
3505  raise Error(msg)
3506 
3507  else:
3508  # DEBUG DEBUG DEBUG
3509  # We should never get here.
3510  assert False, "Unknown input method `%s'" % input_method
3511  # DEBUG DEBUG DEBUG end
3512 
3513  # Remove duplicates, sort and done.
3514  runs = list(set(runs))
3515 
3516  # End of build_runs_list().
3517  return runs
3518 
3519  ##########
3520 
3522  """Build a list of runs to process.
3523 
3524  """
3525 
3526  self.logger.info("Building list of runs to consider...")
3527 
3528  input_method = self.input_method["runs"]["use"]
3529  input_name = self.input_name["runs"]["use"]
3530  runs = self.build_runs_list(input_method, input_name)
3531  self.runs_to_use = dict(list(zip(runs, [None] * len(runs))))
3532 
3533  self.logger.info(" found %d run(s) to process:" % \
3534  len(runs))
3535  if len(runs) > 0:
3536  self.logger.info(" %s" % ", ".join([str(i) for i in runs]))
3537 
3538  # End of build_runs_list().
3539 
3540  ##########
3541 
3543  """Build a list of runs to ignore.
3544 
3545  NOTE: We should always have a list of runs to process, but
3546  it may be that we don't have a list of runs to ignore.
3547 
3548  """
3549 
3550  self.logger.info("Building list of runs to ignore...")
3551 
3552  input_method = self.input_method["runs"]["ignore"]
3553  input_name = self.input_name["runs"]["ignore"]
3554  runs = self.build_runs_list(input_method, input_name)
3555  self.runs_to_ignore = dict(list(zip(runs, [None] * len(runs))))
3556 
3557  self.logger.info(" found %d run(s) to ignore:" % \
3558  len(runs))
3559  if len(runs) > 0:
3560  self.logger.info(" %s" % ", ".join([str(i) for i in runs]))
3561 
3562  # End of build_runs_ignore_list().
3563 
3564  ##########
3565 
3567  """Update the list of datasets taking into account the ones to
3568  ignore.
3569 
3570  Both lists have been generated before from DBS and both are
3571  assumed to be unique.
3572 
3573  NOTE: The advantage of creating the ignore list from DBS (in
3574  case a regexp is given) and matching that instead of directly
3575  matching the ignore criterion against the list of datasets (to
3576  consider) built from DBS is that in the former case we're sure
3577  that all regexps are treated exactly as DBS would have done
3578  without the cmsHarvester.
3579 
3580  NOTE: This only removes complete samples. Exclusion of single
3581  runs is done by the book keeping. So the assumption is that a
3582  user never wants to harvest just part (i.e. n out of N runs)
3583  of a sample.
3584 
3585  """
3586 
3587  self.logger.info("Processing list of datasets to ignore...")
3588 
3589  self.logger.debug("Before processing ignore list there are %d " \
3590  "datasets in the list to be processed" % \
3591  len(self.datasets_to_use))
3592 
3593  # Simple approach: just loop and search.
3594  dataset_names_filtered = copy.deepcopy(self.datasets_to_use)
3595  for dataset_name in self.datasets_to_use.keys():
3596  if dataset_name in self.datasets_to_ignore.keys():
3597  del dataset_names_filtered[dataset_name]
3598 
3599  self.logger.info(" --> Removed %d dataset(s)" % \
3600  (len(self.datasets_to_use) -
3601  len(dataset_names_filtered)))
3602 
3603  self.datasets_to_use = dataset_names_filtered
3604 
3605  self.logger.debug("After processing ignore list there are %d " \
3606  "datasets in the list to be processed" % \
3607  len(self.datasets_to_use))
3608 
3609  # End of process_dataset_ignore_list.
3610 
3611  ##########
3612 
3614 
3615  self.logger.info("Processing list of runs to use and ignore...")
3616 
3617  # This basically adds all runs in a dataset to be processed,
3618  # except for any runs that are not specified in the `to use'
3619  # list and any runs that are specified in the `to ignore'
3620  # list.
3621 
3622  # NOTE: It is assumed that those lists make sense. The input
3623  # should be checked against e.g. overlapping `use' and
3624  # `ignore' lists.
3625 
3626  runs_to_use = self.runs_to_use
3627  runs_to_ignore = self.runs_to_ignore
3628 
3629  for dataset_name in self.datasets_to_use:
3630  runs_in_dataset = self.datasets_information[dataset_name]["runs"]
3631 
3632  # First some sanity checks.
3633  runs_to_use_tmp = []
3634  for run in runs_to_use:
3635  if not run in runs_in_dataset:
3636  self.logger.warning("Dataset `%s' does not contain " \
3637  "requested run %d " \
3638  "--> ignoring `use' of this run" % \
3639  (dataset_name, run))
3640  else:
3641  runs_to_use_tmp.append(run)
3642 
3643  if len(runs_to_use) > 0:
3644  runs = runs_to_use_tmp
3645  self.logger.info("Using %d out of %d runs " \
3646  "of dataset `%s'" % \
3647  (len(runs), len(runs_in_dataset),
3648  dataset_name))
3649  else:
3650  runs = runs_in_dataset
3651 
3652  if len(runs_to_ignore) > 0:
3653  runs_tmp = []
3654  for run in runs:
3655  if not run in runs_to_ignore:
3656  runs_tmp.append(run)
3657  self.logger.info("Ignoring %d out of %d runs " \
3658  "of dataset `%s'" % \
3659  (len(runs)- len(runs_tmp),
3660  len(runs_in_dataset),
3661  dataset_name))
3662  runs = runs_tmp
3663 
3664  if self.todofile != "YourToDofile.txt":
3665  runs_todo = []
3666  print("Reading runs from file /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s" %self.todofile)
3667  cmd="grep %s /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s | cut -f5 -d' '" %(dataset_name,self.todofile)
3668  (status, output)=commands.getstatusoutput(cmd)
3669  for run in runs:
3670  run_str="%s" %run
3671  if run_str in output:
3672  runs_todo.append(run)
3673  self.logger.info("Using %d runs " \
3674  "of dataset `%s'" % \
3675  (len(runs_todo),
3676  dataset_name))
3677  runs=runs_todo
3678 
3679  Json_runs = []
3680  if self.Jsonfilename != "YourJSON.txt":
3681  good_runs = []
3682  self.Jsonlumi = True
3683  # We were passed a Jsonfile containing a dictionary of
3684  # run/lunisection-pairs
3685  self.logger.info("Reading runs and lumisections from file `%s'" % \
3686  self.Jsonfilename)
3687  try:
3688  Jsonfile = open(self.Jsonfilename, "r")
3689  for names in Jsonfile:
3690  dictNames= eval(str(names))
3691  for key in dictNames:
3692  intkey=int(key)
3693  Json_runs.append(intkey)
3694  Jsonfile.close()
3695  except IOError:
3696  msg = "ERROR: Could not open Jsonfile `%s'" % \
3697  input_name
3698  self.logger.fatal(msg)
3699  raise Error(msg)
3700  for run in runs:
3701  if run in Json_runs:
3702  good_runs.append(run)
3703  self.logger.info("Using %d runs " \
3704  "of dataset `%s'" % \
3705  (len(good_runs),
3706  dataset_name))
3707  runs=good_runs
3708  if (self.Jsonrunfilename != "YourJSON.txt") and (self.Jsonfilename == "YourJSON.txt"):
3709  good_runs = []
3710  # We were passed a Jsonfile containing a dictionary of
3711  # run/lunisection-pairs
3712  self.logger.info("Reading runs from file `%s'" % \
3713  self.Jsonrunfilename)
3714  try:
3715  Jsonfile = open(self.Jsonrunfilename, "r")
3716  for names in Jsonfile:
3717  dictNames= eval(str(names))
3718  for key in dictNames:
3719  intkey=int(key)
3720  Json_runs.append(intkey)
3721  Jsonfile.close()
3722  except IOError:
3723  msg = "ERROR: Could not open Jsonfile `%s'" % \
3724  input_name
3725  self.logger.fatal(msg)
3726  raise Error(msg)
3727  for run in runs:
3728  if run in Json_runs:
3729  good_runs.append(run)
3730  self.logger.info("Using %d runs " \
3731  "of dataset `%s'" % \
3732  (len(good_runs),
3733  dataset_name))
3734  runs=good_runs
3735 
3736  self.datasets_to_use[dataset_name] = runs
3737 
3738  # End of process_runs_use_and_ignore_lists().
3739 
3740  ##########
3741 
3743  """Remove all but the largest part of all datasets.
3744 
3745  This allows us to harvest at least part of these datasets
3746  using single-step harvesting until the two-step approach
3747  works.
3748 
3749  """
3750 
3751  # DEBUG DEBUG DEBUG
3752  assert self.harvesting_mode == "single-step-allow-partial"
3753  # DEBUG DEBUG DEBUG end
3754 
3755  for dataset_name in self.datasets_to_use:
3756  for run_number in self.datasets_information[dataset_name]["runs"]:
3757  max_events = max(self.datasets_information[dataset_name]["sites"][run_number].values())
3758  sites_with_max_events = [i[0] for i in self.datasets_information[dataset_name]["sites"][run_number].items() if i[1] == max_events]
3759  self.logger.warning("Singlifying dataset `%s', " \
3760  "run %d" % \
3761  (dataset_name, run_number))
3762  cmssw_version = self.datasets_information[dataset_name] \
3763  ["cmssw_version"]
3764  selected_site = self.pick_a_site(sites_with_max_events,
3765  cmssw_version)
3766 
3767  # Let's tell the user that we're manhandling this dataset.
3768  nevents_old = self.datasets_information[dataset_name]["num_events"][run_number]
3769  self.logger.warning(" --> " \
3770  "only harvesting partial statistics: " \
3771  "%d out of %d events (5.1%f%%) " \
3772  "at site `%s'" % \
3773  (max_events,
3774  nevents_old,
3775  100. * max_events / nevents_old,
3776  selected_site))
3777  self.logger.warning("!!! Please note that the number of " \
3778  "events in the output path name will " \
3779  "NOT reflect the actual statistics in " \
3780  "the harvested results !!!")
3781 
3782  # We found the site with the highest statistics and
3783  # the corresponding number of events. (CRAB gets upset
3784  # if we ask for more events than there are at a given
3785  # site.) Now update this information in our main
3786  # datasets_information variable.
3787  self.datasets_information[dataset_name]["sites"][run_number] = {selected_site: max_events}
3788  self.datasets_information[dataset_name]["num_events"][run_number] = max_events
3789  #self.datasets_information[dataset_name]["sites"][run_number] = [selected_site]
3790 
3791  # End of singlify_datasets.
3792 
3793  ##########
3794 
3796  """Check list of dataset names for impossible ones.
3797 
3798  Two kinds of checks are done:
3799  - Checks for things that do not make sense. These lead to
3800  errors and skipped datasets.
3801  - Sanity checks. For these warnings are issued but the user is
3802  considered to be the authoritative expert.
3803 
3804  Checks performed:
3805  - The CMSSW version encoded in the dataset name should match
3806  self.cmssw_version. This is critical.
3807  - There should be some events in the dataset/run. This is
3808  critical in the sense that CRAB refuses to create jobs for
3809  zero events. And yes, this does happen in practice. E.g. the
3810  reprocessed CRAFT08 datasets contain runs with zero events.
3811  - A cursory check is performed to see if the harvesting type
3812  makes sense for the data type. This should prevent the user
3813  from inadvertently running RelVal for data.
3814  - It is not possible to run single-step harvesting jobs on
3815  samples that are not fully contained at a single site.
3816  - Each dataset/run has to be available at at least one site.
3817 
3818  """
3819 
3820  self.logger.info("Performing sanity checks on dataset list...")
3821 
3822  dataset_names_after_checks = copy.deepcopy(self.datasets_to_use)
3823 
3824  for dataset_name in self.datasets_to_use.keys():
3825 
3826  # Check CMSSW version.
3827  version_from_dataset = self.datasets_information[dataset_name] \
3828  ["cmssw_version"]
3829  if version_from_dataset != self.cmssw_version:
3830  msg = " CMSSW version mismatch for dataset `%s' " \
3831  "(%s vs. %s)" % \
3832  (dataset_name,
3833  self.cmssw_version, version_from_dataset)
3834  if self.force_running:
3835  # Expert mode: just warn, then continue.
3836  self.logger.warning("%s " \
3837  "--> `force mode' active: " \
3838  "run anyway" % msg)
3839  else:
3840  del dataset_names_after_checks[dataset_name]
3841  self.logger.warning("%s " \
3842  "--> skipping" % msg)
3843  continue
3844 
3845  ###
3846 
3847  # Check that the harvesting type makes sense for the
3848  # sample. E.g. normally one would not run the DQMOffline
3849  # harvesting on Monte Carlo.
3850  # TODO TODO TODO
3851  # This should be further refined.
3852  suspicious = False
3853  datatype = self.datasets_information[dataset_name]["datatype"]
3854  if datatype == "data":
3855  # Normally only DQM harvesting is run on data.
3856  if self.harvesting_type != "DQMOffline":
3857  suspicious = True
3858  elif datatype == "mc":
3859  if self.harvesting_type == "DQMOffline":
3860  suspicious = True
3861  else:
3862  # Doh!
3863  assert False, "ERROR Impossible data type `%s' " \
3864  "for dataset `%s'" % \
3865  (datatype, dataset_name)
3866  if suspicious:
3867  msg = " Normally one does not run `%s' harvesting " \
3868  "on %s samples, are you sure?" % \
3869  (self.harvesting_type, datatype)
3870  if self.force_running:
3871  self.logger.warning("%s " \
3872  "--> `force mode' active: " \
3873  "run anyway" % msg)
3874  else:
3875  del dataset_names_after_checks[dataset_name]
3876  self.logger.warning("%s " \
3877  "--> skipping" % msg)
3878  continue
3879 
3880  # TODO TODO TODO end
3881 
3882  ###
3883 
3884  # BUG BUG BUG
3885  # For the moment, due to a problem with DBS, I cannot
3886  # figure out the GlobalTag for data by myself. (For MC
3887  # it's no problem.) This means that unless a GlobalTag was
3888  # specified from the command line, we will have to skip
3889  # any data datasets.
3890 
3891  if datatype == "data":
3892  if self.globaltag is None:
3893  msg = "For data datasets (like `%s') " \
3894  "we need a GlobalTag" % \
3895  dataset_name
3896  del dataset_names_after_checks[dataset_name]
3897  self.logger.warning("%s " \
3898  "--> skipping" % msg)
3899  continue
3900 
3901  # BUG BUG BUG end
3902 
3903  ###
3904 
3905  # Check if the GlobalTag exists and (if we're using
3906  # reference histograms) if it's ready to be used with
3907  # reference histograms.
3908  globaltag = self.datasets_information[dataset_name]["globaltag"]
3909  if not globaltag in self.globaltag_check_cache:
3910  if self.check_globaltag(globaltag):
3911  self.globaltag_check_cache.append(globaltag)
3912  else:
3913  msg = "Something is wrong with GlobalTag `%s' " \
3914  "used by dataset `%s'!" % \
3915  (globaltag, dataset_name)
3916  if self.use_ref_hists:
3917  msg += "\n(Either it does not exist or it " \
3918  "does not contain the required key to " \
3919  "be used with reference histograms.)"
3920  else:
3921  msg += "\n(It probably just does not exist.)"
3922  self.logger.fatal(msg)
3923  raise Usage(msg)
3924 
3925  ###
3926 
3927  # Require that each run is available at least somewhere.
3928  runs_without_sites = [i for (i, j) in \
3929  self.datasets_information[dataset_name] \
3930  ["sites"].items() \
3931  if len(j) < 1 and \
3932  i in self.datasets_to_use[dataset_name]]
3933  if len(runs_without_sites) > 0:
3934  for run_without_sites in runs_without_sites:
3935  try:
3936  dataset_names_after_checks[dataset_name].remove(run_without_sites)
3937  except KeyError:
3938  pass
3939  self.logger.warning(" removed %d unavailable run(s) " \
3940  "from dataset `%s'" % \
3941  (len(runs_without_sites), dataset_name))
3942  self.logger.debug(" (%s)" % \
3943  ", ".join([str(i) for i in \
3944  runs_without_sites]))
3945 
3946  ###
3947 
3948  # Unless we're running two-step harvesting: only allow
3949  # samples located on a single site.
3950  if not self.harvesting_mode == "two-step":
3951  for run_number in self.datasets_to_use[dataset_name]:
3952  # DEBUG DEBUG DEBUG
3953 ## if self.datasets_information[dataset_name]["num_events"][run_number] != 0:
3954 ## pdb.set_trace()
3955  # DEBUG DEBUG DEBUG end
3956  num_sites = len(self.datasets_information[dataset_name] \
3957  ["sites"][run_number])
3958  if num_sites > 1 and \
3959  not self.datasets_information[dataset_name] \
3960  ["mirrored"][run_number]:
3961  # Cannot do this with a single-step job, not
3962  # even in force mode. It just does not make
3963  # sense.
3964  msg = " Dataset `%s', run %d is spread across more " \
3965  "than one site.\n" \
3966  " Cannot run single-step harvesting on " \
3967  "samples spread across multiple sites" % \
3968  (dataset_name, run_number)
3969  try:
3970  dataset_names_after_checks[dataset_name].remove(run_number)
3971  except KeyError:
3972  pass
3973  self.logger.warning("%s " \
3974  "--> skipping" % msg)
3975 
3976  ###
3977 
3978  # Require that the dataset/run is non-empty.
3979  # NOTE: To avoid reconsidering empty runs/datasets next
3980  # time around, we do include them in the book keeping.
3981  # BUG BUG BUG
3982  # This should sum only over the runs that we use!
3983  tmp = [j for (i, j) in self.datasets_information \
3984  [dataset_name]["num_events"].items() \
3985  if i in self.datasets_to_use[dataset_name]]
3986  num_events_dataset = sum(tmp)
3987  # BUG BUG BUG end
3988  if num_events_dataset < 1:
3989  msg = " dataset `%s' is empty" % dataset_name
3990  del dataset_names_after_checks[dataset_name]
3991  self.logger.warning("%s " \
3992  "--> skipping" % msg)
3993  # Update the book keeping with all the runs in the dataset.
3994  # DEBUG DEBUG DEBUG
3995  #assert set([j for (i, j) in self.datasets_information \
3996  # [dataset_name]["num_events"].items() \
3997  # if i in self.datasets_to_use[dataset_name]]) == \
3998  # set([0])
3999  # DEBUG DEBUG DEBUG end
4000  #self.book_keeping_information[dataset_name] = self.datasets_information \
4001  # [dataset_name]["num_events"]
4002  continue
4003 
4004  tmp = [i for i in \
4005  self.datasets_information[dataset_name] \
4006  ["num_events"].items() if i[1] < 1]
4007  tmp = [i for i in tmp if i[0] in self.datasets_to_use[dataset_name]]
4008  empty_runs = dict(tmp)
4009  if len(empty_runs) > 0:
4010  for empty_run in empty_runs:
4011  try:
4012  dataset_names_after_checks[dataset_name].remove(empty_run)
4013  except KeyError:
4014  pass
4015  self.logger.info(" removed %d empty run(s) from dataset `%s'" % \
4016  (len(empty_runs), dataset_name))
4017  self.logger.debug(" (%s)" % \
4018  ", ".join([str(i) for i in empty_runs]))
4019 
4020  ###
4021 
4022  # If we emptied out a complete dataset, remove the whole
4023  # thing.
4024  dataset_names_after_checks_tmp = copy.deepcopy(dataset_names_after_checks)
4025  for (dataset_name, runs) in six.iteritems(dataset_names_after_checks):
4026  if len(runs) < 1:
4027  self.logger.warning(" Removing dataset without any runs " \
4028  "(left) `%s'" % \
4029  dataset_name)
4030  del dataset_names_after_checks_tmp[dataset_name]
4031  dataset_names_after_checks = dataset_names_after_checks_tmp
4032 
4033  ###
4034 
4035  self.logger.warning(" --> Removed %d dataset(s)" % \
4036  (len(self.datasets_to_use) -
4037  len(dataset_names_after_checks)))
4038 
4039  # Now store the modified version of the dataset list.
4040  self.datasets_to_use = dataset_names_after_checks
4041 
4042  # End of check_dataset_list.
4043 
4044  ##########
4045 
4046  def escape_dataset_name(self, dataset_name):
4047  """Escape a DBS dataset name.
4048 
4049  Escape a DBS dataset name such that it does not cause trouble
4050  with the file system. This means turning each `/' into `__',
4051  except for the first one which is just removed.
4052 
4053  """
4054 
4055  escaped_dataset_name = dataset_name
4056  escaped_dataset_name = escaped_dataset_name.strip("/")
4057  escaped_dataset_name = escaped_dataset_name.replace("/", "__")
4058 
4059  return escaped_dataset_name
4060 
4061  ##########
4062 
4063  # BUG BUG BUG
4064  # This is a bit of a redundant method, isn't it?
4065  def create_config_file_name(self, dataset_name, run_number):
4066  """Generate the name of the configuration file to be run by
4067  CRAB.
4068 
4069  Depending on the harvesting mode (single-step or two-step)
4070  this is the name of the real harvesting configuration or the
4071  name of the first-step ME summary extraction configuration.
4072 
4073  """
4074 
4075  if self.harvesting_mode == "single-step":
4076  config_file_name = self.create_harvesting_config_file_name(dataset_name)
4077  elif self.harvesting_mode == "single-step-allow-partial":
4078  config_file_name = self.create_harvesting_config_file_name(dataset_name)
4079 ## # Only add the alarming piece to the file name if this is
4080 ## # a spread-out dataset.
4081 ## pdb.set_trace()
4082 ## if self.datasets_information[dataset_name] \
4083 ## ["mirrored"][run_number] == False:
4084 ## config_file_name = config_file_name.replace(".py", "_partial.py")
4085  elif self.harvesting_mode == "two-step":
4086  config_file_name = self.create_me_summary_config_file_name(dataset_name)
4087  else:
4088  assert False, "ERROR Unknown harvesting mode `%s'" % \
4089  self.harvesting_mode
4090 
4091  # End of create_config_file_name.
4092  return config_file_name
4093  # BUG BUG BUG end
4094 
4095  ##########
4096 
4097  def create_harvesting_config_file_name(self, dataset_name):
4098  "Generate the name to be used for the harvesting config file."
4099 
4100  file_name_base = "harvesting.py"
4101  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4102  config_file_name = file_name_base.replace(".py",
4103  "_%s.py" % \
4104  dataset_name_escaped)
4105 
4106  # End of create_harvesting_config_file_name.
4107  return config_file_name
4108 
4109  ##########
4110 
4111  def create_me_summary_config_file_name(self, dataset_name):
4112  "Generate the name of the ME summary extraction config file."
4113 
4114  file_name_base = "me_extraction.py"
4115  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4116  config_file_name = file_name_base.replace(".py",
4117  "_%s.py" % \
4118  dataset_name_escaped)
4119 
4120  # End of create_me_summary_config_file_name.
4121  return config_file_name
4122 
4123  ##########
4124 
4125  def create_output_file_name(self, dataset_name, run_number=None):
4126  """Create the name of the output file name to be used.
4127 
4128  This is the name of the output file of the `first step'. In
4129  the case of single-step harvesting this is already the final
4130  harvesting output ROOT file. In the case of two-step
4131  harvesting it is the name of the intermediary ME summary
4132  file.
4133 
4134  """
4135 
4136  # BUG BUG BUG
4137  # This method has become a bit of a mess. Originally it was
4138  # nice to have one entry point for both single- and two-step
4139  # output file names. However, now the former needs the run
4140  # number, while the latter does not even know about run
4141  # numbers. This should be fixed up a bit.
4142  # BUG BUG BUG end
4143 
4144  if self.harvesting_mode == "single-step":
4145  # DEBUG DEBUG DEBUG
4146  assert not run_number is None
4147  # DEBUG DEBUG DEBUG end
4148  output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4149  elif self.harvesting_mode == "single-step-allow-partial":
4150  # DEBUG DEBUG DEBUG
4151  assert not run_number is None
4152  # DEBUG DEBUG DEBUG end
4153  output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4154  elif self.harvesting_mode == "two-step":
4155  # DEBUG DEBUG DEBUG
4156  assert run_number is None
4157  # DEBUG DEBUG DEBUG end
4158  output_file_name = self.create_me_summary_output_file_name(dataset_name)
4159  else:
4160  # This should not be possible, but hey...
4161  assert False, "ERROR Unknown harvesting mode `%s'" % \
4162  self.harvesting_mode
4163 
4164  # End of create_harvesting_output_file_name.
4165  return output_file_name
4166 
4167  ##########
4168 
4169  def create_harvesting_output_file_name(self, dataset_name, run_number):
4170  """Generate the name to be used for the harvesting output file.
4171 
4172  This harvesting output file is the _final_ ROOT output file
4173  containing the harvesting results. In case of two-step
4174  harvesting there is an intermediate ME output file as well.
4175 
4176  """
4177 
4178  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4179 
4180  # Hmmm, looking at the code for the DQMFileSaver this might
4181  # actually be the place where the first part of this file
4182  # naming scheme comes from.
4183  # NOTE: It looks like the `V0001' comes from the DQM
4184  # version. This is something that cannot be looked up from
4185  # here, so let's hope it does not change too often.
4186  output_file_name = "DQM_V0001_R%09d__%s.root" % \
4187  (run_number, dataset_name_escaped)
4188  if self.harvesting_mode.find("partial") > -1:
4189  # Only add the alarming piece to the file name if this is
4190  # a spread-out dataset.
4191  if self.datasets_information[dataset_name] \
4192  ["mirrored"][run_number] == False:
4193  output_file_name = output_file_name.replace(".root", \
4194  "_partial.root")
4195 
4196  # End of create_harvesting_output_file_name.
4197  return output_file_name
4198 
4199  ##########
4200 
4201  def create_me_summary_output_file_name(self, dataset_name):
4202  """Generate the name of the intermediate ME file name to be
4203  used in two-step harvesting.
4204 
4205  """
4206 
4207  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4208  output_file_name = "me_summary_%s.root" % \
4209  dataset_name_escaped
4210 
4211  # End of create_me_summary_output_file_name.
4212  return output_file_name
4213 
4214  ##########
4215 
4216  def create_multicrab_block_name(self, dataset_name, run_number, index):
4217  """Create the block name to use for this dataset/run number.
4218 
4219  This is what appears in the brackets `[]' in multicrab.cfg. It
4220  is used as the name of the job and to create output
4221  directories.
4222 
4223  """
4224 
4225  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4226  block_name = "%s_%09d_%s" % (dataset_name_escaped, run_number, index)
4227 
4228  # End of create_multicrab_block_name.
4229  return block_name
4230 
4231  ##########
4232 
4234  """Create a CRAB configuration for a given job.
4235 
4236  NOTE: This is _not_ a complete (as in: submittable) CRAB
4237  configuration. It is used to store the common settings for the
4238  multicrab configuration.
4239 
4240  NOTE: Only CERN CASTOR area (/castor/cern.ch/) is supported.
4241 
4242  NOTE: According to CRAB, you `Must define exactly two of
4243  total_number_of_events, events_per_job, or
4244  number_of_jobs.'. For single-step harvesting we force one job,
4245  for the rest we don't really care.
4246 
4247  # BUG BUG BUG
4248  # With the current version of CRAB (2.6.1), in which Daniele
4249  # fixed the behaviour of no_block_boundary for me, one _has to
4250  # specify_ the total_number_of_events and one single site in
4251  # the se_white_list.
4252  # BUG BUG BUG end
4253 
4254  """
4255 
4256  tmp = []
4257 
4258  # This is the stuff we will need to fill in.
4259  castor_prefix = self.castor_prefix
4260 
4261  tmp.append(self.config_file_header())
4262  tmp.append("")
4263 
4264  ## CRAB
4265  ##------
4266  tmp.append("[CRAB]")
4267  tmp.append("jobtype = cmssw")
4268  tmp.append("")
4269 
4270  ## GRID
4271  ##------
4272  tmp.append("[GRID]")
4273  tmp.append("virtual_organization=cms")
4274  tmp.append("")
4275 
4276  ## USER
4277  ##------
4278  tmp.append("[USER]")
4279  tmp.append("copy_data = 1")
4280  tmp.append("")
4281 
4282  ## CMSSW
4283  ##-------
4284  tmp.append("[CMSSW]")
4285  tmp.append("# This reveals data hosted on T1 sites,")
4286  tmp.append("# which is normally hidden by CRAB.")
4287  tmp.append("show_prod = 1")
4288  tmp.append("number_of_jobs = 1")
4289  if self.Jsonlumi == True:
4290  tmp.append("lumi_mask = %s" % self.Jsonfilename)
4291  tmp.append("total_number_of_lumis = -1")
4292  else:
4293  if self.harvesting_type == "DQMOffline":
4294  tmp.append("total_number_of_lumis = -1")
4295  else:
4296  tmp.append("total_number_of_events = -1")
4297  if self.harvesting_mode.find("single-step") > -1:
4298  tmp.append("# Force everything to run in one job.")
4299  tmp.append("no_block_boundary = 1")
4300  tmp.append("")
4301 
4302  ## CAF
4303  ##-----
4304  tmp.append("[CAF]")
4305 
4306  crab_config = "\n".join(tmp)
4307 
4308  # End of create_crab_config.
4309  return crab_config
4310 
4311  ##########
4312 
4314  """Create a multicrab.cfg file for all samples.
4315 
4316  This creates the contents for a multicrab.cfg file that uses
4317  the crab.cfg file (generated elsewhere) for the basic settings
4318  and contains blocks for each run of each dataset.
4319 
4320  # BUG BUG BUG
4321  # The fact that it's necessary to specify the se_white_list
4322  # and the total_number_of_events is due to our use of CRAB
4323  # version 2.6.1. This should no longer be necessary in the
4324  # future.
4325  # BUG BUG BUG end
4326 
4327  """
4328 
4329  cmd="who i am | cut -f1 -d' '"
4330  (status, output)=commands.getstatusoutput(cmd)
4331  UserName = output
4332 
4333  if self.caf_access == True:
4334  print("Extracting %s as user name" %UserName)
4335 
4336  number_max_sites = self.nr_max_sites + 1
4337 
4338  multicrab_config_lines = []
4339  multicrab_config_lines.append(self.config_file_header())
4340  multicrab_config_lines.append("")
4341  multicrab_config_lines.append("[MULTICRAB]")
4342  multicrab_config_lines.append("cfg = crab.cfg")
4343  multicrab_config_lines.append("")
4344 
4345  dataset_names = sorted(self.datasets_to_use.keys())
4346 
4347  for dataset_name in dataset_names:
4348  runs = self.datasets_to_use[dataset_name]
4349  dataset_name_escaped = self.escape_dataset_name(dataset_name)
4350  castor_prefix = self.castor_prefix
4351 
4352  for run in runs:
4353 
4354  # CASTOR output dir.
4355  castor_dir = self.datasets_information[dataset_name] \
4356  ["castor_path"][run]
4357 
4358  cmd = "rfdir %s" % castor_dir
4359  (status, output) = commands.getstatusoutput(cmd)
4360 
4361  if len(output) <= 0:
4362 
4363  # DEBUG DEBUG DEBUG
4364  # We should only get here if we're treating a
4365  # dataset/run that is fully contained at a single
4366  # site.
4367  assert (len(self.datasets_information[dataset_name] \
4368  ["sites"][run]) == 1) or \
4369  self.datasets_information[dataset_name]["mirrored"]
4370  # DEBUG DEBUG DEBUG end
4371 
4372  site_names = self.datasets_information[dataset_name] \
4373  ["sites"][run].keys()
4374 
4375  for i in range(1, number_max_sites, 1):
4376  if len(site_names) > 0:
4377  index = "site_%02d" % (i)
4378 
4379  config_file_name = self. \
4380  create_config_file_name(dataset_name, run)
4381  output_file_name = self. \
4382  create_output_file_name(dataset_name, run)
4383 
4384 
4385  # If we're looking at a mirrored dataset we just pick
4386  # one of the sites. Otherwise there is nothing to
4387  # choose.
4388 
4389  # Loop variable
4390  loop = 0
4391 
4392  if len(site_names) > 1:
4393  cmssw_version = self.datasets_information[dataset_name] \
4394  ["cmssw_version"]
4395  self.logger.info("Picking site for mirrored dataset " \
4396  "`%s', run %d" % \
4397  (dataset_name, run))
4398  site_name = self.pick_a_site(site_names, cmssw_version)
4399  if site_name in site_names:
4400  site_names.remove(site_name)
4401 
4402  else:
4403  site_name = site_names[0]
4404  site_names.remove(site_name)
4405 
4406  if site_name is self.no_matching_site_found_str:
4407  if loop < 1:
4408  break
4409 
4410  nevents = self.datasets_information[dataset_name]["num_events"][run]
4411 
4412  # The block name.
4413  multicrab_block_name = self.create_multicrab_block_name( \
4414  dataset_name, run, index)
4415  multicrab_config_lines.append("[%s]" % \
4416  multicrab_block_name)
4417 
4418  ## CRAB
4419  ##------
4420  if site_name == "caf.cern.ch":
4421  multicrab_config_lines.append("CRAB.use_server=0")
4422  multicrab_config_lines.append("CRAB.scheduler=caf")
4423  else:
4424  multicrab_config_lines.append("scheduler = glite")
4425 
4426  ## GRID
4427  ##------
4428  if site_name == "caf.cern.ch":
4429  pass
4430  else:
4431  multicrab_config_lines.append("GRID.se_white_list = %s" % \
4432  site_name)
4433  multicrab_config_lines.append("# This removes the default blacklisting of T1 sites.")
4434  multicrab_config_lines.append("GRID.remove_default_blacklist = 1")
4435  multicrab_config_lines.append("GRID.rb = CERN")
4436  if not self.non_t1access:
4437  multicrab_config_lines.append("GRID.role = t1access")
4438 
4439  ## USER
4440  ##------
4441 
4442  castor_dir = castor_dir.replace(castor_prefix, "")
4443  multicrab_config_lines.append("USER.storage_element=srm-cms.cern.ch")
4444  multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4445  castor_dir)
4446  multicrab_config_lines.append("USER.check_user_remote_dir=0")
4447 
4448  if site_name == "caf.cern.ch":
4449  multicrab_config_lines.append("USER.storage_path=%s" % castor_prefix)
4450  #multicrab_config_lines.append("USER.storage_element=T2_CH_CAF")
4451  #castor_dir = castor_dir.replace("/cms/store/caf/user/%s" %UserName, "")
4452  #multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4453  # castor_dir)
4454  else:
4455  multicrab_config_lines.append("USER.storage_path=/srm/managerv2?SFN=%s" % castor_prefix)
4456  #multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4457  # castor_dir)
4458  #multicrab_config_lines.append("USER.storage_element=srm-cms.cern.ch")
4459 
4460  ## CMSSW
4461  ##-------
4462  multicrab_config_lines.append("CMSSW.pset = %s" % \
4463  config_file_name)
4464  multicrab_config_lines.append("CMSSW.datasetpath = %s" % \
4465  dataset_name)
4466  multicrab_config_lines.append("CMSSW.runselection = %d" % \
4467  run)
4468 
4469  if self.Jsonlumi == True:
4470  pass
4471  else:
4472  if self.harvesting_type == "DQMOffline":
4473  pass
4474  else:
4475  multicrab_config_lines.append("CMSSW.total_number_of_events = %d" % \
4476  nevents)
4477  # The output file name.
4478  multicrab_config_lines.append("CMSSW.output_file = %s" % \
4479  output_file_name)
4480 
4481  ## CAF
4482  ##-----
4483  if site_name == "caf.cern.ch":
4484  multicrab_config_lines.append("CAF.queue=cmscaf1nd")
4485 
4486 
4487  # End of block.
4488  multicrab_config_lines.append("")
4489 
4490  loop = loop + 1
4491 
4492  self.all_sites_found = True
4493 
4494  multicrab_config = "\n".join(multicrab_config_lines)
4495 
4496  # End of create_multicrab_config.
4497  return multicrab_config
4498 
4499  ##########
4500 
4501  def check_globaltag(self, globaltag=None):
4502  """Check if globaltag exists.
4503 
4504  Check if globaltag exists as GlobalTag in the database given
4505  by self.frontier_connection_name['globaltag']. If globaltag is
4506  None, self.globaltag is used instead.
4507 
4508  If we're going to use reference histograms this method also
4509  checks for the existence of the required key in the GlobalTag.
4510 
4511  """
4512 
4513  if globaltag is None:
4514  globaltag = self.globaltag
4515 
4516  # All GlobalTags should end in `::All', right?
4517  if globaltag.endswith("::All"):
4518  globaltag = globaltag[:-5]
4519 
4520  connect_name = self.frontier_connection_name["globaltag"]
4521  # BUG BUG BUG
4522  # There is a bug in cmscond_tagtree_list: some magic is
4523  # missing from the implementation requiring one to specify
4524  # explicitly the name of the squid to connect to. Since the
4525  # cmsHarvester can only be run from the CERN network anyway,
4526  # cmsfrontier:8000 is hard-coded in here. Not nice but it
4527  # works.
4528  connect_name = connect_name.replace("frontier://",
4529  "frontier://cmsfrontier:8000/")
4530  # BUG BUG BUG end
4531  connect_name += self.db_account_name_cms_cond_globaltag()
4532 
4533  tag_exists = self.check_globaltag_exists(globaltag, connect_name)
4534 
4535  #----------
4536 
4537  tag_contains_ref_hist_key = False
4538  if self.use_ref_hists and tag_exists:
4539  # Check for the key required to use reference histograms.
4540  tag_contains_ref_hist_key = self.check_globaltag_contains_ref_hist_key(globaltag, connect_name)
4541 
4542  #----------
4543 
4544  if self.use_ref_hists:
4545  ret_val = tag_exists and tag_contains_ref_hist_key
4546  else:
4547  ret_val = tag_exists
4548 
4549  #----------
4550 
4551  # End of check_globaltag.
4552  return ret_val
4553 
4554  ##########
4555 
4556  def check_globaltag_exists(self, globaltag, connect_name):
4557  """Check if globaltag exists.
4558 
4559  """
4560 
4561  self.logger.info("Checking existence of GlobalTag `%s'" % \
4562  globaltag)
4563  self.logger.debug(" (Using database connection `%s')" % \
4564  connect_name)
4565 
4566  cmd = "cmscond_tagtree_list -c %s -T %s" % \
4567  (connect_name, globaltag)
4568  (status, output) = commands.getstatusoutput(cmd)
4569  if status != 0 or \
4570  output.find("error") > -1:
4571  msg = "Could not check existence of GlobalTag `%s' in `%s'" % \
4572  (globaltag, connect_name)
4573  if output.find(".ALL_TABLES not found") > -1:
4574  msg = "%s\n" \
4575  "Missing database account `%s'" % \
4576  (msg, output.split(".ALL_TABLES")[0].split()[-1])
4577  self.logger.fatal(msg)
4578  self.logger.debug("Command used:")
4579  self.logger.debug(" %s" % cmd)
4580  self.logger.debug("Output received:")
4581  self.logger.debug(output)
4582  raise Error(msg)
4583  if output.find("does not exist") > -1:
4584  self.logger.debug("GlobalTag `%s' does not exist in `%s':" % \
4585  (globaltag, connect_name))
4586  self.logger.debug("Output received:")
4587  self.logger.debug(output)
4588  tag_exists = False
4589  else:
4590  tag_exists = True
4591  self.logger.info(" GlobalTag exists? -> %s" % tag_exists)
4592 
4593  # End of check_globaltag_exists.
4594  return tag_exists
4595 
4596  ##########
4597 
4598  def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name):
4599  """Check if globaltag contains the required RefHistos key.
4600 
4601  """
4602 
4603  # Check for the key required to use reference histograms.
4604  tag_contains_key = None
4605  ref_hist_key = "RefHistos"
4606  self.logger.info("Checking existence of reference " \
4607  "histogram key `%s' in GlobalTag `%s'" % \
4608  (ref_hist_key, globaltag))
4609  self.logger.debug(" (Using database connection `%s')" % \
4610  connect_name)
4611  cmd = "cmscond_tagtree_list -c %s -T %s -n %s" % \
4612  (connect_name, globaltag, ref_hist_key)
4613  (status, output) = commands.getstatusoutput(cmd)
4614  if status != 0 or \
4615  output.find("error") > -1:
4616  msg = "Could not check existence of key `%s'" % \
4617  (ref_hist_key, connect_name)
4618  self.logger.fatal(msg)
4619  self.logger.debug("Command used:")
4620  self.logger.debug(" %s" % cmd)
4621  self.logger.debug("Output received:")
4622  self.logger.debug(" %s" % output)
4623  raise Error(msg)
4624  if len(output) < 1:
4625  self.logger.debug("Required key for use of reference " \
4626  "histograms `%s' does not exist " \
4627  "in GlobalTag `%s':" % \
4628  (ref_hist_key, globaltag))
4629  self.logger.debug("Output received:")
4630  self.logger.debug(output)
4631  tag_contains_key = False
4632  else:
4633  tag_contains_key = True
4634 
4635  self.logger.info(" GlobalTag contains `%s' key? -> %s" % \
4636  (ref_hist_key, tag_contains_key))
4637 
4638  # End of check_globaltag_contains_ref_hist_key.
4639  return tag_contains_key
4640 
4641  ##########
4642 
4643  def check_ref_hist_tag(self, tag_name):
4644  """Check the existence of tag_name in database connect_name.
4645 
4646  Check if tag_name exists as a reference histogram tag in the
4647  database given by self.frontier_connection_name['refhists'].
4648 
4649  """
4650 
4651  connect_name = self.frontier_connection_name["refhists"]
4652  connect_name += self.db_account_name_cms_cond_dqm_summary()
4653 
4654  self.logger.debug("Checking existence of reference " \
4655  "histogram tag `%s'" % \
4656  tag_name)
4657  self.logger.debug(" (Using database connection `%s')" % \
4658  connect_name)
4659 
4660  cmd = "cmscond_list_iov -c %s" % \
4661  connect_name
4662  (status, output) = commands.getstatusoutput(cmd)
4663  if status != 0:
4664  msg = "Could not check existence of tag `%s' in `%s'" % \
4665  (tag_name, connect_name)
4666  self.logger.fatal(msg)
4667  self.logger.debug("Command used:")
4668  self.logger.debug(" %s" % cmd)
4669  self.logger.debug("Output received:")
4670  self.logger.debug(output)
4671  raise Error(msg)
4672  if not tag_name in output.split():
4673  self.logger.debug("Reference histogram tag `%s' " \
4674  "does not exist in `%s'" % \
4675  (tag_name, connect_name))
4676  self.logger.debug(" Existing tags: `%s'" % \
4677  "', `".join(output.split()))
4678  tag_exists = False
4679  else:
4680  tag_exists = True
4681  self.logger.debug(" Reference histogram tag exists? " \
4682  "-> %s" % tag_exists)
4683 
4684  # End of check_ref_hist_tag.
4685  return tag_exists
4686 
4687  ##########
4688 
4689  def create_es_prefer_snippet(self, dataset_name):
4690  """Build the es_prefer snippet for the reference histograms.
4691 
4692  The building of the snippet is wrapped in some care-taking
4693  code that figures out the name of the reference histogram set
4694  and makes sure the corresponding tag exists.
4695 
4696  """
4697 
4698  # Figure out the name of the reference histograms tag.
4699  # NOTE: The existence of these tags has already been checked.
4700  ref_hist_tag_name = self.ref_hist_mappings[dataset_name]
4701 
4702  connect_name = self.frontier_connection_name["refhists"]
4703  connect_name += self.db_account_name_cms_cond_dqm_summary()
4704  record_name = "DQMReferenceHistogramRootFileRcd"
4705 
4706  # Build up the code snippet.
4707  code_lines = []
4708  code_lines.append("from CondCore.DBCommon.CondDBSetup_cfi import *")
4709  code_lines.append("process.ref_hist_source = cms.ESSource(\"PoolDBESSource\", CondDBSetup,")
4710  code_lines.append(" connect = cms.string(\"%s\")," % connect_name)
4711  code_lines.append(" toGet = cms.VPSet(cms.PSet(record = cms.string(\"%s\")," % record_name)
4712  code_lines.append(" tag = cms.string(\"%s\"))," % ref_hist_tag_name)
4713  code_lines.append(" )")
4714  code_lines.append(" )")
4715  code_lines.append("process.es_prefer_ref_hist_source = cms.ESPrefer(\"PoolDBESSource\", \"ref_hist_source\")")
4716 
4717  snippet = "\n".join(code_lines)
4718 
4719  # End of create_es_prefer_snippet.
4720  return snippet
4721 
4722  ##########
4723 
4724  def create_harvesting_config(self, dataset_name):
4725  """Create the Python harvesting configuration for harvesting.
4726 
4727  The basic configuration is created by
4728  Configuration.PyReleaseValidation.ConfigBuilder. (This mimics
4729  what cmsDriver.py does.) After that we add some specials
4730  ourselves.
4731 
4732  NOTE: On one hand it may not be nice to circumvent
4733  cmsDriver.py, on the other hand cmsDriver.py does not really
4734  do anything itself. All the real work is done by the
4735  ConfigBuilder so there is not much risk that we miss out on
4736  essential developments of cmsDriver in the future.
4737 
4738  """
4739 
4740  # Setup some options needed by the ConfigBuilder.
4741  config_options = defaultOptions
4742 
4743  # These are fixed for all kinds of harvesting jobs. Some of
4744  # them are not needed for the harvesting config, but to keep
4745  # the ConfigBuilder happy.
4746  config_options.name = "harvesting"
4747  config_options.scenario = "pp"
4748  config_options.number = 1
4749  config_options.arguments = self.ident_string()
4750  config_options.evt_type = config_options.name
4751  config_options.customisation_file = None
4752  config_options.filein = "dummy_value"
4753  config_options.filetype = "EDM"
4754  # This seems to be new in CMSSW 3.3.X, no clue what it does.
4755  config_options.gflash = "dummy_value"
4756  # This seems to be new in CMSSW 3.3.0.pre6, no clue what it
4757  # does.
4758  #config_options.himix = "dummy_value"
4759  config_options.dbsquery = ""
4760 
4761  ###
4762 
4763  # These options depend on the type of harvesting we're doing
4764  # and are stored in self.harvesting_info.
4765 
4766  config_options.step = "HARVESTING:%s" % \
4767  self.harvesting_info[self.harvesting_type] \
4768  ["step_string"]
4769  config_options.beamspot = self.harvesting_info[self.harvesting_type] \
4770  ["beamspot"]
4771  config_options.eventcontent = self.harvesting_info \
4772  [self.harvesting_type] \
4773  ["eventcontent"]
4774  config_options.harvesting = self.harvesting_info \
4775  [self.harvesting_type] \
4776  ["harvesting"]
4777 
4778  ###
4779 
4780  # This one is required (see also above) for each dataset.
4781 
4782  datatype = self.datasets_information[dataset_name]["datatype"]
4783  config_options.isMC = (datatype.lower() == "mc")
4784  config_options.isData = (datatype.lower() == "data")
4785  globaltag = self.datasets_information[dataset_name]["globaltag"]
4786 
4787  config_options.conditions = self.format_conditions_string(globaltag)
4788 
4789  ###
4790 
4791  if "with_input" in getargspec(ConfigBuilder.__init__)[0]:
4792  # This is the case for 3.3.X.
4793  config_builder = ConfigBuilder(config_options, with_input=True)
4794  else:
4795  # This is the case in older CMSSW versions.
4796  config_builder = ConfigBuilder(config_options)
4797  config_builder.prepare(True)
4798  config_contents = config_builder.pythonCfgCode
4799 
4800  ###
4801 
4802  # Add our signature to the top of the configuration. and add
4803  # some markers to the head and the tail of the Python code
4804  # generated by the ConfigBuilder.
4805  marker_lines = []
4806  sep = "#" * 30
4807  marker_lines.append(sep)
4808  marker_lines.append("# Code between these markers was generated by")
4809  marker_lines.append("# Configuration.PyReleaseValidation." \
4810  "ConfigBuilder")
4811 
4812  marker_lines.append(sep)
4813  marker = "\n".join(marker_lines)
4814 
4815  tmp = [self.config_file_header()]
4816  tmp.append("")
4817  tmp.append(marker)
4818  tmp.append("")
4819  tmp.append(config_contents)
4820  tmp.append("")
4821  tmp.append(marker)
4822  tmp.append("")
4823  config_contents = "\n".join(tmp)
4824 
4825  ###
4826 
4827  # Now we add some stuff of our own.
4828  customisations = [""]
4829 
4830  customisations.append("# Now follow some customisations")
4831  customisations.append("")
4832  connect_name = self.frontier_connection_name["globaltag"]
4833  connect_name += self.db_account_name_cms_cond_globaltag()
4834  customisations.append("process.GlobalTag.connect = \"%s\"" % \
4835  connect_name)
4836 
4837 
4838  if self.saveByLumiSection == True:
4839  customisations.append("process.dqmSaver.saveByLumiSection = 1")
4840 
4842 
4843  customisations.append("")
4844 
4845  # About the reference histograms... For data there is only one
4846  # set of references and those are picked up automatically
4847  # based on the GlobalTag. For MC we have to do some more work
4848  # since the reference histograms to be used depend on the MC
4849  # sample at hand. In this case we glue in an es_prefer snippet
4850  # to pick up the references. We do this only for RelVals since
4851  # for MC there are no meaningful references so far.
4852 
4853  # NOTE: Due to the lack of meaningful references for
4854  # MC samples reference histograms are explicitly
4855  # switched off in this case.
4856 
4857  use_es_prefer = (self.harvesting_type == "RelVal")
4858  use_refs = use_es_prefer or \
4859  (not self.harvesting_type == "MC")
4860  # Allow global override.
4861  use_refs = use_refs and self.use_ref_hists
4862 
4863  if not use_refs:
4864  # Disable reference histograms explicitly. The histograms
4865  # are loaded by the dqmRefHistoRootFileGetter
4866  # EDAnalyzer. This analyzer can be run from several
4867  # sequences. Here we remove it from each sequence that
4868  # exists.
4869  customisations.append("print \"Not using reference histograms\"")
4870  customisations.append("if hasattr(process, \"dqmRefHistoRootFileGetter\"):")
4871  customisations.append(" for (sequence_name, sequence) in six.iteritems(process.sequences):")
4872  customisations.append(" if sequence.remove(process.dqmRefHistoRootFileGetter):")
4873  customisations.append(" print \"Removed process.dqmRefHistoRootFileGetter from sequence `%s'\" % \\")
4874  customisations.append(" sequence_name")
4875  customisations.append("process.dqmSaver.referenceHandling = \"skip\"")
4876  else:
4877  # This makes sure all reference histograms are saved to
4878  # the output ROOT file.
4879  customisations.append("process.dqmSaver.referenceHandling = \"all\"")
4880  if use_es_prefer:
4881  es_prefer_snippet = self.create_es_prefer_snippet(dataset_name)
4882  customisations.append(es_prefer_snippet)
4883 
4884  # Make sure we get the `workflow' correct. As far as I can see
4885  # this is only important for the output file name.
4886  workflow_name = dataset_name
4887  if self.harvesting_mode == "single-step-allow-partial":
4888  workflow_name += "_partial"
4889  customisations.append("process.dqmSaver.workflow = \"%s\"" % \
4890  workflow_name)
4891 
4892  # BUG BUG BUG
4893  # This still does not work. The current two-step harvesting
4894  # efforts are on hold waiting for the solution to come from
4895  # elsewhere. (In this case the elsewhere is Daniele Spiga.)
4896 
4897 ## # In case this file is the second step (the real harvesting
4898 ## # step) of the two-step harvesting we have to tell it to use
4899 ## # our local files.
4900 ## if self.harvesting_mode == "two-step":
4901 ## castor_dir = self.datasets_information[dataset_name] \
4902 ## ["castor_path"][run]
4903 ## customisations.append("")
4904 ## customisations.append("# This is the second step (the real")
4905 ## customisations.append("# harvesting step) of a two-step")
4906 ## customisations.append("# harvesting procedure.")
4907 ## # BUG BUG BUG
4908 ## # To be removed in production version.
4909 ## customisations.append("import pdb")
4910 ## # BUG BUG BUG end
4911 ## customisations.append("import commands")
4912 ## customisations.append("import os")
4913 ## customisations.append("castor_dir = \"%s\"" % castor_dir)
4914 ## customisations.append("cmd = \"rfdir %s\" % castor_dir")
4915 ## customisations.append("(status, output) = commands.getstatusoutput(cmd)")
4916 ## customisations.append("if status != 0:")
4917 ## customisations.append(" print \"ERROR\"")
4918 ## customisations.append(" raise Exception, \"ERROR\"")
4919 ## customisations.append("file_names = [os.path.join(\"rfio:%s\" % path, i) for i in output.split() if i.startswith(\"EDM_summary\") and i.endswith(\".root\")]")
4920 ## #customisations.append("pdb.set_trace()")
4921 ## customisations.append("process.source.fileNames = cms.untracked.vstring(*file_names)")
4922 ## customisations.append("")
4923 
4924  # BUG BUG BUG end
4925 
4926  config_contents = config_contents + "\n".join(customisations)
4927 
4928  ###
4929 
4930  # End of create_harvesting_config.
4931  return config_contents
4932 
4933 ## ##########
4934 
4935 ## def create_harvesting_config_two_step(self, dataset_name):
4936 ## """Create the Python harvesting configuration for two-step
4937 ## harvesting.
4938 
4939 ## """
4940 
4941 ## # BUG BUG BUG
4942 ## config_contents = self.create_harvesting_config_single_step(dataset_name)
4943 ## # BUG BUG BUG end
4944 
4945 ## # End of create_harvesting_config_two_step.
4946 ## return config_contents
4947 
4948  ##########
4949 
4950  def create_me_extraction_config(self, dataset_name):
4951  """
4952 
4953  """
4954 
4955  # Big chunk of hard-coded Python. Not such a big deal since
4956  # this does not do much and is not likely to break.
4957  tmp = []
4958  tmp.append(self.config_file_header())
4959  tmp.append("")
4960  tmp.append("import FWCore.ParameterSet.Config as cms")
4961  tmp.append("")
4962  tmp.append("process = cms.Process(\"ME2EDM\")")
4963  tmp.append("")
4964  tmp.append("# Import of standard configurations")
4965  tmp.append("process.load(\"Configuration/EventContent/EventContent_cff\")")
4966  tmp.append("")
4967  tmp.append("# We don't really process any events, just keep this set to one to")
4968  tmp.append("# make sure things work.")
4969  tmp.append("process.maxEvents = cms.untracked.PSet(")
4970  tmp.append(" input = cms.untracked.int32(1)")
4971  tmp.append(" )")
4972  tmp.append("")
4973  tmp.append("process.options = cms.untracked.PSet(")
4974  tmp.append(" Rethrow = cms.untracked.vstring(\"ProductNotFound\")")
4975  tmp.append(" )")
4976  tmp.append("")
4977  tmp.append("process.source = cms.Source(\"PoolSource\",")
4978  tmp.append(" processingMode = \\")
4979  tmp.append(" cms.untracked.string(\"RunsAndLumis\"),")
4980  tmp.append(" fileNames = \\")
4981  tmp.append(" cms.untracked.vstring(\"no_file_specified\")")
4982  tmp.append(" )")
4983  tmp.append("")
4984  tmp.append("# Output definition: drop everything except for the monitoring.")
4985  tmp.append("process.output = cms.OutputModule(")
4986  tmp.append(" \"PoolOutputModule\",")
4987  tmp.append(" outputCommands = \\")
4988  tmp.append(" cms.untracked.vstring(\"drop *\", \\")
4989  tmp.append(" \"keep *_MEtoEDMConverter_*_*\"),")
4990  output_file_name = self. \
4991  create_output_file_name(dataset_name)
4992  tmp.append(" fileName = \\")
4993  tmp.append(" cms.untracked.string(\"%s\")," % output_file_name)
4994  tmp.append(" dataset = cms.untracked.PSet(")
4995  tmp.append(" dataTier = cms.untracked.string(\"RECO\"),")
4996  tmp.append(" filterName = cms.untracked.string(\"\")")
4997  tmp.append(" )")
4998  tmp.append(" )")
4999  tmp.append("")
5000  tmp.append("# Additional output definition")
5001  tmp.append("process.out_step = cms.EndPath(process.output)")
5002  tmp.append("")
5003  tmp.append("# Schedule definition")
5004  tmp.append("process.schedule = cms.Schedule(process.out_step)")
5005  tmp.append("")
5006 
5007  config_contents = "\n".join(tmp)
5008 
5009  # End of create_me_extraction_config.
5010  return config_contents
5011 
5012  ##########
5013 
5014 ## def create_harvesting_config(self, dataset_name):
5015 ## """Create the Python harvesting configuration for a given job.
5016 
5017 ## NOTE: The reason to have a single harvesting configuration per
5018 ## sample is to be able to specify the GlobalTag corresponding to
5019 ## each sample. Since it has been decided that (apart from the
5020 ## prompt reco) datasets cannot contain runs with different
5021 ## GlobalTags, we don't need a harvesting config per run.
5022 
5023 ## NOTE: This is the place where we distinguish between
5024 ## single-step and two-step harvesting modes (at least for the
5025 ## Python job configuration).
5026 
5027 ## """
5028 
5029 ## ###
5030 
5031 ## if self.harvesting_mode == "single-step":
5032 ## config_contents = self.create_harvesting_config_single_step(dataset_name)
5033 ## elif self.harvesting_mode == "two-step":
5034 ## config_contents = self.create_harvesting_config_two_step(dataset_name)
5035 ## else:
5036 ## # Impossible harvesting mode, we should never get here.
5037 ## assert False, "ERROR: unknown harvesting mode `%s'" % \
5038 ## self.harvesting_mode
5039 
5040 ## ###
5041 
5042 ## # End of create_harvesting_config.
5043 ## return config_contents
5044 
5045  ##########
5046 
5048  """Write a CRAB job configuration Python file.
5049 
5050  """
5051 
5052  self.logger.info("Writing CRAB configuration...")
5053 
5054  file_name_base = "crab.cfg"
5055 
5056  # Create CRAB configuration.
5057  crab_contents = self.create_crab_config()
5058 
5059  # Write configuration to file.
5060  crab_file_name = file_name_base
5061  try:
5062  crab_file = file(crab_file_name, "w")
5063  crab_file.write(crab_contents)
5064  crab_file.close()
5065  except IOError:
5066  self.logger.fatal("Could not write " \
5067  "CRAB configuration to file `%s'" % \
5068  crab_file_name)
5069  raise Error("ERROR: Could not write to file `%s'!" % \
5070  crab_file_name)
5071 
5072  # End of write_crab_config.
5073 
5074  ##########
5075 
5077  """Write a multi-CRAB job configuration Python file.
5078 
5079  """
5080 
5081  self.logger.info("Writing multi-CRAB configuration...")
5082 
5083  file_name_base = "multicrab.cfg"
5084 
5085  # Create multi-CRAB configuration.
5086  multicrab_contents = self.create_multicrab_config()
5087 
5088  # Write configuration to file.
5089  multicrab_file_name = file_name_base
5090  try:
5091  multicrab_file = file(multicrab_file_name, "w")
5092  multicrab_file.write(multicrab_contents)
5093  multicrab_file.close()
5094  except IOError:
5095  self.logger.fatal("Could not write " \
5096  "multi-CRAB configuration to file `%s'" % \
5097  multicrab_file_name)
5098  raise Error("ERROR: Could not write to file `%s'!" % \
5099  multicrab_file_name)
5100 
5101  # End of write_multicrab_config.
5102 
5103  ##########
5104 
5105  def write_harvesting_config(self, dataset_name):
5106  """Write a harvesting job configuration Python file.
5107 
5108  NOTE: This knows nothing about single-step or two-step
5109  harvesting. That's all taken care of by
5110  create_harvesting_config.
5111 
5112  """
5113 
5114  self.logger.debug("Writing harvesting configuration for `%s'..." % \
5115  dataset_name)
5116 
5117  # Create Python configuration.
5118  config_contents = self.create_harvesting_config(dataset_name)
5119 
5120  # Write configuration to file.
5121  config_file_name = self. \
5123  try:
5124  config_file = file(config_file_name, "w")
5125  config_file.write(config_contents)
5126  config_file.close()
5127  except IOError:
5128  self.logger.fatal("Could not write " \
5129  "harvesting configuration to file `%s'" % \
5130  config_file_name)
5131  raise Error("ERROR: Could not write to file `%s'!" % \
5132  config_file_name)
5133 
5134  # End of write_harvesting_config.
5135 
5136  ##########
5137 
5138  def write_me_extraction_config(self, dataset_name):
5139  """Write an ME-extraction configuration Python file.
5140 
5141  This `ME-extraction' (ME = Monitoring Element) is the first
5142  step of the two-step harvesting.
5143 
5144  """
5145 
5146  self.logger.debug("Writing ME-extraction configuration for `%s'..." % \
5147  dataset_name)
5148 
5149  # Create Python configuration.
5150  config_contents = self.create_me_extraction_config(dataset_name)
5151 
5152  # Write configuration to file.
5153  config_file_name = self. \
5155  try:
5156  config_file = file(config_file_name, "w")
5157  config_file.write(config_contents)
5158  config_file.close()
5159  except IOError:
5160  self.logger.fatal("Could not write " \
5161  "ME-extraction configuration to file `%s'" % \
5162  config_file_name)
5163  raise Error("ERROR: Could not write to file `%s'!" % \
5164  config_file_name)
5165 
5166  # End of write_me_extraction_config.
5167 
5168  ##########
5169 
5170 
5171  def ref_hist_mappings_needed(self, dataset_name=None):
5172  """Check if we need to load and check the reference mappings.
5173 
5174  For data the reference histograms should be taken
5175  automatically from the GlobalTag, so we don't need any
5176  mappings. For RelVals we need to know a mapping to be used in
5177  the es_prefer code snippet (different references for each of
5178  the datasets.)
5179 
5180  WARNING: This implementation is a bit convoluted.
5181 
5182  """
5183 
5184  # If no dataset name given, do everything, otherwise check
5185  # only this one dataset.
5186  if not dataset_name is None:
5187  data_type = self.datasets_information[dataset_name] \
5188  ["datatype"]
5189  mappings_needed = (data_type == "mc")
5190  # DEBUG DEBUG DEBUG
5191  if not mappings_needed:
5192  assert data_type == "data"
5193  # DEBUG DEBUG DEBUG end
5194  else:
5195  tmp = [self.ref_hist_mappings_needed(dataset_name) \
5196  for dataset_name in \
5197  self.datasets_information.keys()]
5198  mappings_needed = (True in tmp)
5199 
5200  # End of ref_hist_mappings_needed.
5201  return mappings_needed
5202 
5203  ##########
5204 
5206  """Load the reference histogram mappings from file.
5207 
5208  The dataset name to reference histogram name mappings are read
5209  from a text file specified in self.ref_hist_mappings_file_name.
5210 
5211  """
5212 
5213  # DEBUG DEBUG DEBUG
5214  assert len(self.ref_hist_mappings) < 1, \
5215  "ERROR Should not be RE-loading " \
5216  "reference histogram mappings!"
5217  # DEBUG DEBUG DEBUG end
5218 
5219  self.logger.info("Loading reference histogram mappings " \
5220  "from file `%s'" % \
5221  self.ref_hist_mappings_file_name)
5222 
5223  mappings_lines = None
5224  try:
5225  mappings_file = file(self.ref_hist_mappings_file_name, "r")
5226  mappings_lines = mappings_file.readlines()
5227  mappings_file.close()
5228  except IOError:
5229  msg = "ERROR: Could not open reference histogram mapping "\
5230  "file `%s'" % self.ref_hist_mappings_file_name
5231  self.logger.fatal(msg)
5232  raise Error(msg)
5233 
5234  ##########
5235 
5236  # The format we expect is: two white-space separated pieces
5237  # per line. The first the dataset name for which the reference
5238  # should be used, the second one the name of the reference
5239  # histogram in the database.
5240 
5241  for mapping in mappings_lines:
5242  # Skip comment lines.
5243  if not mapping.startswith("#"):
5244  mapping = mapping.strip()
5245  if len(mapping) > 0:
5246  mapping_pieces = mapping.split()
5247  if len(mapping_pieces) != 2:
5248  msg = "ERROR: The reference histogram mapping " \
5249  "file contains a line I don't " \
5250  "understand:\n %s" % mapping
5251  self.logger.fatal(msg)
5252  raise Error(msg)
5253  dataset_name = mapping_pieces[0].strip()
5254  ref_hist_name = mapping_pieces[1].strip()
5255  # We don't want people to accidentally specify
5256  # multiple mappings for the same dataset. Just
5257  # don't accept those cases.
5258  if dataset_name in self.ref_hist_mappings:
5259  msg = "ERROR: The reference histogram mapping " \
5260  "file contains multiple mappings for " \
5261  "dataset `%s'."
5262  self.logger.fatal(msg)
5263  raise Error(msg)
5264 
5265  # All is well that ends well.
5266  self.ref_hist_mappings[dataset_name] = ref_hist_name
5267 
5268  ##########
5269 
5270  self.logger.info(" Successfully loaded %d mapping(s)" % \
5271  len(self.ref_hist_mappings))
5272  max_len = max([len(i) for i in self.ref_hist_mappings.keys()])
5273  for (map_from, map_to) in six.iteritems(self.ref_hist_mappings):
5274  self.logger.info(" %-*s -> %s" % \
5275  (max_len, map_from, map_to))
5276 
5277  # End of load_ref_hist_mappings.
5278 
5279  ##########
5280 
5282  """Make sure all necessary reference histograms exist.
5283 
5284  Check that for each of the datasets to be processed a
5285  reference histogram is specified and that that histogram
5286  exists in the database.
5287 
5288  NOTE: There's a little complication here. Since this whole
5289  thing was designed to allow (in principle) harvesting of both
5290  data and MC datasets in one go, we need to be careful to check
5291  the availability fof reference mappings only for those
5292  datasets that need it.
5293 
5294  """
5295 
5296  self.logger.info("Checking reference histogram mappings")
5297 
5298  for dataset_name in self.datasets_to_use:
5299  try:
5300  ref_hist_name = self.ref_hist_mappings[dataset_name]
5301  except KeyError:
5302  msg = "ERROR: No reference histogram mapping found " \
5303  "for dataset `%s'" % \
5304  dataset_name
5305  self.logger.fatal(msg)
5306  raise Error(msg)
5307 
5308  if not self.check_ref_hist_tag(ref_hist_name):
5309  msg = "Reference histogram tag `%s' " \
5310  "(used for dataset `%s') does not exist!" % \
5311  (ref_hist_name, dataset_name)
5312  self.logger.fatal(msg)
5313  raise Usage(msg)
5314 
5315  self.logger.info(" Done checking reference histogram mappings.")
5316 
5317  # End of check_ref_hist_mappings.
5318 
5319  ##########
5320 
5322  """Obtain all information on the datasets that we need to run.
5323 
5324  Use DBS to figure out all required information on our
5325  datasets, like the run numbers and the GlobalTag. All
5326  information is stored in the datasets_information member
5327  variable.
5328 
5329  """
5330 
5331  # Get a list of runs in the dataset.
5332  # NOTE: The harvesting has to be done run-by-run, so we
5333  # split up datasets based on the run numbers. Strictly
5334  # speaking this is not (yet?) necessary for Monte Carlo
5335  # since all those samples use run number 1. Still, this
5336  # general approach should work for all samples.
5337 
5338  # Now loop over all datasets in the list and process them.
5339  # NOTE: This processing has been split into several loops
5340  # to be easier to follow, sacrificing a bit of efficiency.
5341  self.datasets_information = {}
5342  self.logger.info("Collecting information for all datasets to process")
5343  dataset_names = sorted(self.datasets_to_use.keys())
5344  for dataset_name in dataset_names:
5345 
5346  # Tell the user which dataset: nice with many datasets.
5347  sep_line = "-" * 30
5348  self.logger.info(sep_line)
5349  self.logger.info(" `%s'" % dataset_name)
5350  self.logger.info(sep_line)
5351 
5352  runs = self.dbs_resolve_runs(dataset_name)
5353  self.logger.info(" found %d run(s)" % len(runs))
5354  if len(runs) > 0:
5355  self.logger.debug(" run number(s): %s" % \
5356  ", ".join([str(i) for i in runs]))
5357  else:
5358  # DEBUG DEBUG DEBUG
5359  # This should never happen after the DBS checks.
5360  self.logger.warning(" --> skipping dataset "
5361  "without any runs")
5362  assert False, "Panic: found a dataset without runs " \
5363  "after DBS checks!"
5364  # DEBUG DEBUG DEBUG end
5365 
5366  cmssw_version = self.dbs_resolve_cmssw_version(dataset_name)
5367  self.logger.info(" found CMSSW version `%s'" % cmssw_version)
5368 
5369  # Figure out if this is data or MC.
5370  datatype = self.dbs_resolve_datatype(dataset_name)
5371  self.logger.info(" sample is data or MC? --> %s" % \
5372  datatype)
5373 
5374  ###
5375 
5376  # Try and figure out the GlobalTag to be used.
5377  if self.globaltag is None:
5378  globaltag = self.dbs_resolve_globaltag(dataset_name)
5379  else:
5380  globaltag = self.globaltag
5381 
5382  self.logger.info(" found GlobalTag `%s'" % globaltag)
5383 
5384  # DEBUG DEBUG DEBUG
5385  if globaltag == "":
5386  # Actually we should not even reach this point, after
5387  # our dataset sanity checks.
5388  assert datatype == "data", \
5389  "ERROR Empty GlobalTag for MC dataset!!!"
5390  # DEBUG DEBUG DEBUG end
5391 
5392  ###
5393 
5394  # DEBUG DEBUG DEBUG
5395  #tmp = self.dbs_check_dataset_spread_old(dataset_name)
5396  # DEBUG DEBUG DEBUG end
5397  sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5398 
5399  # Extract the total event counts.
5400  num_events = {}
5401  for run_number in sites_catalog.keys():
5402  num_events[run_number] = sites_catalog \
5403  [run_number]["all_sites"]
5404  del sites_catalog[run_number]["all_sites"]
5405 
5406  # Extract the information about whether or not datasets
5407  # are mirrored.
5408  mirror_catalog = {}
5409  for run_number in sites_catalog.keys():
5410  mirror_catalog[run_number] = sites_catalog \
5411  [run_number]["mirrored"]
5412  del sites_catalog[run_number]["mirrored"]
5413 
5414  # BUG BUG BUG
5415  # I think I could now get rid of that and just fill the
5416  # "sites" entry with the `inverse' of this
5417  # num_events_catalog(?).
5418  #num_sites = self.dbs_resolve_dataset_number_of_sites(dataset_name)
5419  #sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5420  #sites_catalog = dict(zip(num_events_catalog.keys(),
5421  # [[j for i in num_events_catalog.values() for j in i.keys()]]))
5422  # BUG BUG BUG end
5423 
5424 ## # DEBUG DEBUG DEBUG
5425 ## # This is probably only useful to make sure we don't muck
5426 ## # things up, right?
5427 ## # Figure out across how many sites this sample has been spread.
5428 ## if num_sites == 1:
5429 ## self.logger.info(" sample is contained at a single site")
5430 ## else:
5431 ## self.logger.info(" sample is spread across %d sites" % \
5432 ## num_sites)
5433 ## if num_sites < 1:
5434 ## # NOTE: This _should not_ happen with any valid dataset.
5435 ## self.logger.warning(" --> skipping dataset which is not " \
5436 ## "hosted anywhere")
5437 ## # DEBUG DEBUG DEBUG end
5438 
5439  # Now put everything in a place where we can find it again
5440  # if we need it.
5441  self.datasets_information[dataset_name] = {}
5442  self.datasets_information[dataset_name]["runs"] = runs
5443  self.datasets_information[dataset_name]["cmssw_version"] = \
5444  cmssw_version
5445  self.datasets_information[dataset_name]["globaltag"] = globaltag
5446  self.datasets_information[dataset_name]["datatype"] = datatype
5447  self.datasets_information[dataset_name]["num_events"] = num_events
5448  self.datasets_information[dataset_name]["mirrored"] = mirror_catalog
5449  self.datasets_information[dataset_name]["sites"] = sites_catalog
5450 
5451  # Each run of each dataset has a different CASTOR output
5452  # path.
5453  castor_path_common = self.create_castor_path_name_common(dataset_name)
5454  self.logger.info(" output will go into `%s'" % \
5455  castor_path_common)
5456 
5457  castor_paths = dict(list(zip(runs,
5458  [self.create_castor_path_name_special(dataset_name, i, castor_path_common) \
5459  for i in runs])))
5460  for path_name in castor_paths.values():
5461  self.logger.debug(" %s" % path_name)
5462  self.datasets_information[dataset_name]["castor_path"] = \
5463  castor_paths
5464 
5465  # End of build_datasets_information.
5466 
5467  ##########
5468 
5470  """Tell the user what to do now, after this part is done.
5471 
5472  This should provide the user with some (preferably
5473  copy-pasteable) instructions on what to do now with the setups
5474  and files that have been created.
5475 
5476  """
5477 
5478  # TODO TODO TODO
5479  # This could be improved a bit.
5480  # TODO TODO TODO end
5481 
5482  sep_line = "-" * 60
5483 
5484  self.logger.info("")
5485  self.logger.info(sep_line)
5486  self.logger.info(" Configuration files have been created.")
5487  self.logger.info(" From here on please follow the usual CRAB instructions.")
5488  self.logger.info(" Quick copy-paste instructions are shown below.")
5489  self.logger.info(sep_line)
5490 
5491  self.logger.info("")
5492  self.logger.info(" Create all CRAB jobs:")
5493  self.logger.info(" multicrab -create")
5494  self.logger.info("")
5495  self.logger.info(" Submit all CRAB jobs:")
5496  self.logger.info(" multicrab -submit")
5497  self.logger.info("")
5498  self.logger.info(" Check CRAB status:")
5499  self.logger.info(" multicrab -status")
5500  self.logger.info("")
5501 
5502  self.logger.info("")
5503  self.logger.info(" For more information please see the CMS Twiki:")
5504  self.logger.info(" %s" % twiki_url)
5505  self.logger.info(sep_line)
5506 
5507  # If there were any jobs for which we could not find a
5508  # matching site show a warning message about that.
5509  if not self.all_sites_found:
5510  self.logger.warning(" For some of the jobs no matching " \
5511  "site could be found")
5512  self.logger.warning(" --> please scan your multicrab.cfg" \
5513  "for occurrences of `%s'." % \
5514  self.no_matching_site_found_str)
5515  self.logger.warning(" You will have to fix those " \
5516  "by hand, sorry.")
5517 
5518  # End of show_exit_message.
5519 
5520  ##########
5521 
5522  def run(self):
5523  "Main entry point of the CMS harvester."
5524 
5525  # Start with a positive thought.
5526  exit_code = 0
5527 
5528  try:
5529 
5530  try:
5531 
5532  # Parse all command line options and arguments
5533  self.parse_cmd_line_options()
5534  # and check that they make sense.
5535  self.check_input_status()
5536 
5537  # Check if CMSSW is setup.
5538  self.check_cmssw()
5539 
5540  # Check if DBS is setup,
5541  self.check_dbs()
5542  # and if all is fine setup the Python side.
5543  self.setup_dbs()
5544 
5545  # Fill our dictionary with all the required info we
5546  # need to understand harvesting jobs. This needs to be
5547  # done after the CMSSW version is known.
5548  self.setup_harvesting_info()
5549 
5550  # Obtain list of dataset names to consider
5551  self.build_dataset_use_list()
5552  # and the list of dataset names to ignore.
5553  self.build_dataset_ignore_list()
5554 
5555  # The same for the runs lists (if specified).
5556  self.build_runs_use_list()
5557  self.build_runs_ignore_list()
5558 
5559  # Process the list of datasets to ignore and fold that
5560  # into the list of datasets to consider.
5561  # NOTE: The run-based selection is done later since
5562  # right now we don't know yet which runs a dataset
5563  # contains.
5564  self.process_dataset_ignore_list()
5565 
5566  # Obtain all required information on the datasets,
5567  # like run numbers and GlobalTags.
5568  self.build_datasets_information()
5569 
5570  if self.use_ref_hists and \
5571  self.ref_hist_mappings_needed():
5572  # Load the dataset name to reference histogram
5573  # name mappings from file.
5574  self.load_ref_hist_mappings()
5575  # Now make sure that for all datasets we want to
5576  # process there is a reference defined. Otherwise
5577  # just bomb out before wasting any more time.
5578  self.check_ref_hist_mappings()
5579  else:
5580  self.logger.info("No need to load reference " \
5581  "histogram mappings file")
5582 
5583  # OBSOLETE OBSOLETE OBSOLETE
5584 ## # TODO TODO TODO
5585 ## # Need to think about where this should go, but
5586 ## # somewhere we have to move over the fact that we want
5587 ## # to process all runs for each dataset that we're
5588 ## # considering. This basically means copying over the
5589 ## # information from self.datasets_information[]["runs"]
5590 ## # to self.datasets_to_use[].
5591 ## for dataset_name in self.datasets_to_use.keys():
5592 ## self.datasets_to_use[dataset_name] = self.datasets_information[dataset_name]["runs"]
5593 ## # TODO TODO TODO end
5594  # OBSOLETE OBSOLETE OBSOLETE end
5595 
5596  self.process_runs_use_and_ignore_lists()
5597 
5598  # If we've been asked to sacrifice some parts of
5599  # spread-out samples in order to be able to partially
5600  # harvest them, we'll do that here.
5601  if self.harvesting_mode == "single-step-allow-partial":
5602  self.singlify_datasets()
5603 
5604  # Check dataset name(s)
5605  self.check_dataset_list()
5606  # and see if there is anything left to do.
5607  if len(self.datasets_to_use) < 1:
5608  self.logger.info("After all checks etc. " \
5609  "there are no datasets (left?) " \
5610  "to process")
5611  else:
5612 
5613  self.logger.info("After all checks etc. we are left " \
5614  "with %d dataset(s) to process " \
5615  "for a total of %d runs" % \
5616  (len(self.datasets_to_use),
5617  sum([len(i) for i in \
5618  self.datasets_to_use.values()])))
5619 
5620  # NOTE: The order in which things are done here is
5621  # important. At the end of the job, independent on
5622  # how it ends (exception, CTRL-C, normal end) the
5623  # book keeping is written to file. At that time it
5624  # should be clear which jobs are done and can be
5625  # submitted. This means we first create the
5626  # general files, and then the per-job config
5627  # files.
5628 
5629  # TODO TODO TODO
5630  # It would be good to modify the book keeping a
5631  # bit. Now we write the crab.cfg (which is the
5632  # same for all samples and runs) and the
5633  # multicrab.cfg (which contains blocks for all
5634  # runs of all samples) without updating our book
5635  # keeping. The only place we update the book
5636  # keeping is after writing the harvesting config
5637  # file for a given dataset. Since there is only
5638  # one single harvesting configuration for each
5639  # dataset, we have no book keeping information on
5640  # a per-run basis.
5641  # TODO TODO TODO end
5642 
5643  # Check if the CASTOR output area exists. If
5644  # necessary create it.
5645  self.create_and_check_castor_dirs()
5646 
5647  # Create one crab and one multicrab configuration
5648  # for all jobs together.
5649  self.write_crab_config()
5650  self.write_multicrab_config()
5651 
5652  # Loop over all datasets and create harvesting
5653  # config files for all of them. One harvesting
5654  # config per dataset is enough. The same file will
5655  # be re-used by CRAB for each run.
5656  # NOTE: We always need a harvesting
5657  # configuration. For the two-step harvesting we
5658  # also need a configuration file for the first
5659  # step: the monitoring element extraction.
5660  for dataset_name in self.datasets_to_use.keys():
5661  try:
5662  self.write_harvesting_config(dataset_name)
5663  if self.harvesting_mode == "two-step":
5664  self.write_me_extraction_config(dataset_name)
5665  except:
5666  # Doh! Just re-raise the damn thing.
5667  raise
5668  else:
5669 ## tmp = self.datasets_information[dataset_name] \
5670 ## ["num_events"]
5671  tmp = {}
5672  for run_number in self.datasets_to_use[dataset_name]:
5673  tmp[run_number] = self.datasets_information \
5674  [dataset_name]["num_events"][run_number]
5675  if dataset_name in self.book_keeping_information:
5676  self.book_keeping_information[dataset_name].update(tmp)
5677  else:
5678  self.book_keeping_information[dataset_name] = tmp
5679 
5680  # Explain the user what to do now.
5681  self.show_exit_message()
5682 
5683  except Usage as err:
5684  # self.logger.fatal(err.msg)
5685  # self.option_parser.print_help()
5686  pass
5687 
5688  except Error as err:
5689  # self.logger.fatal(err.msg)
5690  exit_code = 1
5691 
5692  except Exception as err:
5693  # Hmmm, ignore keyboard interrupts from the
5694  # user. These are not a `serious problem'. We also
5695  # skip SystemExit, which is the exception thrown when
5696  # one calls sys.exit(). This, for example, is done by
5697  # the option parser after calling print_help(). We
5698  # also have to catch all `no such option'
5699  # complaints. Everything else we catch here is a
5700  # `serious problem'.
5701  if isinstance(err, SystemExit):
5702  self.logger.fatal(err.code)
5703  elif not isinstance(err, KeyboardInterrupt):
5704  self.logger.fatal("!" * 50)
5705  self.logger.fatal(" This looks like a serious problem.")
5706  self.logger.fatal(" If you are sure you followed all " \
5707  "instructions")
5708  self.logger.fatal(" please copy the below stack trace together")
5709  self.logger.fatal(" with a description of what you were doing to")
5710  self.logger.fatal(" jeroen.hegeman@cern.ch.")
5711  self.logger.fatal(" %s" % self.ident_string())
5712  self.logger.fatal("!" * 50)
5713  self.logger.fatal(str(err))
5714  import traceback
5715  traceback_string = traceback.format_exc()
5716  for line in traceback_string.split("\n"):
5717  self.logger.fatal(line)
5718  self.logger.fatal("!" * 50)
5719  exit_code = 2
5720 
5721  # This is the stuff that we should really do, no matter
5722  # what. Of course cleaning up after ourselves is also done
5723  # from this place. This alsokeeps track of the book keeping
5724  # so far. (This means that if half of the configuration files
5725  # were created before e.g. the disk was full, we should still
5726  # have a consistent book keeping file.
5727  finally:
5728 
5729  self.cleanup()
5730 
5731  ###
5732 
5733  if self.crab_submission == True:
5734  os.system("multicrab -create")
5735  os.system("multicrab -submit")
5736 
5737  # End of run.
5738  return exit_code
5739 
5740  # End of CMSHarvester.
5741 
5742 ###########################################################################
5743 ## Main entry point.
5744 ###########################################################################
5745 
5746 if __name__ == "__main__":
5747  "Main entry point for harvesting."
5748 
5749  CMSHarvester().run()
5750 
5751  # Done.
5752 
5753 ###########################################################################
def create_crab_config(self)
def option_handler_harvesting_mode(self, option, opt_str, value, parser)
def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser)
def check_cmssw(self)
def write_harvesting_config(self, dataset_name)
def option_handler_saveByLumiSection(self, option, opt_str, value, parser)
def option_handler_sites(self, option, opt_str, value, parser)
def write_me_extraction_config(self, dataset_name)
def __init__(self, msg)
def process_runs_use_and_ignore_lists(self)
def build_dataset_ignore_list(self)
def escape_dataset_name(self, dataset_name)
if self.datasets_information[dataset_name]["num_events"][run_number] != 0: pdb.set_trace() DEBUG DEBU...
def startElement(self, name, attrs)
def singlify_datasets(self)
def option_handler_castor_dir(self, option, opt_str, value, parser)
def option_handler_dataset_name(self, option, opt_str, value, parser): """Specify the name(s) of the ...
def option_handler_force(self, option, opt_str, value, parser)
def option_handler_caf_access(self, option, opt_str, value, parser)
def option_handler_frontier_connection(self, option, opt_str, value, parser)
def option_handler_crab_submission(self, option, opt_str, value, parser)
def create_harvesting_config(self, dataset_name)
def create_castor_path_name_special(self, dataset_name, run_number, castor_path_common)
def option_handler_harvesting_type(self, option, opt_str, value, parser)
def write_crab_config(self)
def create_harvesting_config(self, dataset_name): """Create the Python harvesting configuration for a...
def replace(string, replacements)
def write_multicrab_config(self)
def build_runs_ignore_list(self)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
void find(edm::Handle< EcalRecHitCollection > &hits, DetId thisDet, std::vector< EcalRecHitCollection::const_iterator > &hit, bool debug=false)
Definition: FindCaloHit.cc:20
def option_handler_preferred_site(self, option, opt_str, value, parser)
def option_handler_quiet(self, option, opt_str, value, parser)
def run(self)
def create_castor_path_name_common(self, dataset_name)
def show_exit_message(self)
DEBUG DEBUG DEBUGThis is probably only useful to make sure we don&#39;t muckthings up, right?Figure out across how many sites this sample has been spread.
Helper class: Error exception.
def check_globaltag(self, globaltag=None)
def ref_hist_mappings_needed(self, dataset_name=None)
def dbs_resolve_runs(self, dataset_name)
def dbs_resolve_dataset_number_of_events(self, dataset_name): """Ask DBS across how many events this ...
Helper class: Usage exception.
def option_handler_no_ref_hists(self, option, opt_str, value, parser)
def create_me_extraction_config(self, dataset_name)
In case this file is the second step (the real harvestingstep) of the two-step harvesting we have to ...
def create_me_summary_config_file_name(self, dataset_name)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def build_datasets_information(self)
def check_globaltag_exists(self, globaltag, connect_name)
def check_ref_hist_mappings(self)
def check_ref_hist_tag(self, tag_name)
def dbs_resolve_datatype(self, dataset_name)
def create_multicrab_block_name(self, dataset_name, run_number, index)
def __init__(self, cmd_line_opts=None)
def __init__(self, tag_names)
def setup_harvesting_info(self)
Helper class: DBSXMLHandler.
def option_handler_list_types(self, option, opt_str, value, parser)
def create_output_file_name(self, dataset_name, run_number=None)
def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser)
def option_handler_book_keeping_file(self, option, opt_str, value, parser)
def pick_a_site(self, sites, cmssw_version)
self.logger.debug("Checking CASTOR path piece `%s&#39;" % \ piece)
def create_and_check_castor_dir(self, castor_dir)
def dbs_resolve_cmssw_version(self, dataset_name)
def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name)
def dbs_resolve_number_of_events(self, dataset_name, run_number=None)
def set_output_level(self, output_level)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def load_ref_hist_mappings(self)
def parse_cmd_line_options(self)
def option_handler_input_Jsonfile(self, option, opt_str, value, parser)
def option_handler_input_todofile(self, option, opt_str, value, parser)
def option_handler_input_spec(self, option, opt_str, value, parser)
def create_es_prefer_snippet(self, dataset_name)
def option_handler_debug(self, option, opt_str, value, parser)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:212
def dbs_resolve_globaltag(self, dataset_name)
def check_dbs(self)
def format_conditions_string(self, globaltag)
CMSHarvester class.
def build_runs_list(self, input_method, input_name)
def check_dataset_list(self)
def create_harvesting_config_file_name(self, dataset_name)
Only add the alarming piece to the file name if this isa spread-out dataset.
def __init__(self, msg)
def dbs_check_dataset_spread(self, dataset_name)
def dbs_resolve_dataset_number_of_sites(self, dataset_name): """Ask DBS across how many sites this da...
#define update(a, b)
def build_dataset_use_list(self)
def create_me_summary_output_file_name(self, dataset_name)
def characters(self, content)
def db_account_name_cms_cond_globaltag(self)
def option_handler_no_t1access(self, option, opt_str, value, parser)
def option_handler_globaltag(self, option, opt_str, value, parser)
def db_account_name_cms_cond_dqm_summary(self)
def check_input_status(self)
def dbs_resolve_dataset_name(self, dataset_name)
#define str(s)
def create_config_file_name(self, dataset_name, run_number)
def setup_dbs(self)
Now we try to do a very simple DBS search.
def build_runs_use_list(self)
def create_multicrab_config(self)
def create_and_check_castor_dirs(self)
Helper class: CMSHarvesterHelpFormatter.
double split
Definition: MVATrainer.cc:139
def create_harvesting_output_file_name(self, dataset_name, run_number)
def endElement(self, name)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
def process_dataset_ignore_list(self)
def build_dataset_list(self, input_method, input_name)
class Handler(xml.sax.handler.ContentHandler): def startElement(self, name, attrs): if name == "resul...