CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
CrabHelper.py
Go to the documentation of this file.
1 from __future__ import print_function
2 from __future__ import absolute_import
3 import logging
4 import sys
5 import os
6 from importlib import import_module
7 import subprocess
8 import time
9 from . import tools
10 
11 log = logging.getLogger(__name__)
12 class CrabHelper(object):
13 
14  def __init__(self):
15  # perform imports only when creating instance. This allows to use the classmethods e.g.for
16  # CLI construction before crab is sourced.
17  self.crabFunctions = import_module('CalibMuon.DTCalibration.Workflow.Crabtools.crabFunctions')
18  # cached member variables
19  self._crab = None
20  self._cert_info = None
21 
22  def submit_crab_task(self):
23  # create a crab config
24  log.info("Creating crab config")
25  self.create_crab_config()
26  #write crab config
27  full_crab_config_filename = self.write_crabConfig()
28  if self.options.no_exec:
29  log.info("Runing with option no-exec exiting")
30  return True
31  #submit crab job
32  log.info("Submitting crab job")
33  self.crab.submit(full_crab_config_filename)
34  log.info("crab job submitted. Waiting 120 seconds before first status call")
35  time.sleep( 120 )
36 
37  task = self.crabFunctions.CrabTask(crab_config = full_crab_config_filename)
38  task.update()
39  if task.state =="UNKNOWN":
40  time.sleep( 30 )
41  task.update()
42  success_states = ( 'QUEUED', 'SUBMITTED', "COMPLETED", "FINISHED")
43  if task.state in success_states:
44  log.info("Job in state %s" % task.state )
45  return True
46  else:
47  log.error("Job submission not successful, crab state:%s" % task.state)
48  raise RuntimeError("Job submission not successful, crab state:%s" % task.state)
49 
50  def check_crabtask(self):
52  task = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
53  initUpdate = False)
54  if self.options.no_exec:
55  log.info("Nothing to check in no-exec mode")
56  return True
57  for n_check in range(self.options.max_checks):
58  task.update()
59  if task.state in ( "COMPLETED"):
60  print("Crab task complete. Getting output locally")
61  output_path = os.path.join( self.local_path, "unmerged_results" )
62  self.get_output_files(task, output_path)
63  return True
64  if task.state in ("SUBMITFAILED", "FAILED"):
65  print("Crab task failed")
66  return False
67  possible_job_states = ["nUnsubmitted",
68  "nIdle",
69  "nRunning",
70  "nTransferring",
71  "nCooloff",
72  "nFailed",
73  "nFinished",
74  "nComplete" ]
75 
76  jobinfos = ""
77  for jobstate in possible_job_states:
78  njobs_in_state = getattr(task, jobstate)
79  if njobs_in_state > 0:
80  jobinfos+="%s: %d " % (jobstate[1:], njobs_in_state)
81 
82  #clear line for reuse
83  sys.stdout.write("\r")
84  sys.stdout.write("".join([" " for i in range(tools.getTerminalSize()[0])]))
85  sys.stdout.write("\r")
86  prompt_text = "Check (%d/%d). Task state: %s (%s). Press q and enter to stop checks: " % (n_check,
87  self.options.max_checks, task.state, jobinfos)
88  user_input = tools.stdinWait(prompt_text, "", self.options.check_interval)
89  if user_input in ("q","Q"):
90  return False
91  print("Task not completed after %d checks (%d minutes)" % ( self.options.max_checks,
92  int( self.options.check_interval / 60. )))
93  return False
94 
96  process = subprocess.Popen(['voms-proxy-info', '-timeleft'],
97  stdout=subprocess.PIPE,
98  stderr=subprocess.STDOUT)
99  stdout = process.communicate()[0]
100  if process.returncode != 0:
101  return 0
102  else:
103  return int(stdout)
104 
105  def voms_proxy_create(self, passphrase = None):
106  voms = 'cms'
107  if passphrase:
108  p = subprocess.Popen(['voms-proxy-init', '--voms', voms, '--valid', '192:00'],
109  stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
110  stdout = p.communicate(input=passphrase+'\n')[0]
111  retcode = p.returncode
112  if not retcode == 0:
113  raise ProxyError('Proxy initialization command failed: %s'%stdout)
114  else:
115  retcode = subprocess.call(['voms-proxy-init', '--voms', voms, '--valid', '192:00'])
116  if not retcode == 0:
117  raise ProxyError('Proxy initialization command failed.')
118 
119 
121  """ Create a crab config object dependent on the chosen command option"""
122  from CalibMuon.DTCalibration.Workflow.Crabtools.crabConfigParser import CrabConfigParser
124  """ Fill common options in crab config """
125  ### General section
126  self.crab_config.add_section('General')
127  if "/" in self.crab_taskname:
128  raise ValueError( 'Sample contains "/" which is not allowed' )
129  self.crab_config.set( 'General', 'requestName', self.crab_taskname )
130  self.crab_config.set( 'General', 'workArea', self.local_path)
131  if self.options.no_log:
132  self.crab_config.set( 'General', 'transferLogs', 'False' )
133  else:
134  self.crab_config.set( 'General', 'transferLogs', 'True' )
135  ### JobType section
136  self.crab_config.add_section('JobType')
137  self.crab_config.set( 'JobType', 'pluginName', 'Analysis' )
138  self.crab_config.set( 'JobType', 'psetName', self.pset_path )
139  self.crab_config.set( 'JobType', 'outputFiles', self.output_files)
140  if self.input_files:
141  self.crab_config.set( 'JobType', 'inputFiles', self.input_files)
142  ### Data section
143  self.crab_config.add_section('Data')
144  self.crab_config.set('Data', 'inputDataset', self.options.datasetpath)
145  # set job splitting options
146  if self.options.datasettype =="MC":
147  self.crab_config.set('Data', 'splitting', 'FileBased')
148  self.crab_config.set('Data', 'unitsPerJob', str(self.options.filesPerJob) )
149  else:
150  self.crab_config.set('Data', 'splitting', 'LumiBased')
151  self.crab_config.set('Data', 'unitsPerJob', str(self.options.lumisPerJob) )
152  if self.options.runselection:
153  self.crab_config.set( "Data",
154  "runRange",
155  ",".join( self.options.runselection )
156  )
157  # set output path in compliance with crab3 structure
158  self.crab_config.set('Data', 'publication', False)
159  self.crab_config.set('Data', 'outputDatasetTag', self.remote_out_path["outputDatasetTag"])
160  self.crab_config.set('Data', 'outLFNDirBase', self.remote_out_path["outLFNDirBase"] )
161 
162  # set site section options
163  self.crab_config.add_section('Site')
164  self.crab_config.set('Site', 'storageSite', self.options.output_site)
165  self.crab_config.set('Site', 'whitelist', self.options.ce_white_list)
166  self.crab_config.set('Site', 'blacklist', self.options.ce_black_list)
167 
168  #set user section options if necessary
169 # if self.cert_info.voGroup or self.cert_info.voRole:
170 # self.crab_config.add_section('User')
171 # if self.cert_info.voGroup:
172 # self.crab_config.set('User', "voGroup", self.cert_info.voGroup)
173 # if self.cert_info.voRole:
174 # self.crab_config.set('User', "voRole", self.cert_info.voRole)
175  log.debug("Created crab config: %s " % self.crab_config_filename)
176 
177  def write_crabConfig(self):
178  """ Write crab config file in working dir with label option as name """
179  base_path = os.path.join( self.options.working_dir,self.local_path)
180  filename = os.path.join( base_path, self.crab_config_filename)
181  if not os.path.exists(base_path):
182  os.makedirs(base_path)
183  if os.path.exists(filename):
184  raise IOError("file %s alrady exits"%(filename))
185  self.crab_config.writeCrabConfig(filename)
186  log.info( 'created crab config file %s'%filename )
187  return filename
188 
190  crabtask = CrabTask( crab_config = self.crab_config_filename )
191  splitinfo = crabtask.crabConfig.Data.outputDatasetTag.split("_")
192  run, trial = splitinfo[0].split("Run")[-1], splitinfo[1].split("v")[-1]
193  if not self.options.run:
194  self.options.run = int(run)
195  self.options.trail = int(trial)
196  if not hasattr(self.options, "datasetpath"):
197  self.options.datasetpath = crabtask.crabConfig.Data.inputDataset
198  if not hasattr(self.options, "label"):
199  self.options.label = crabtask.crabConfig.General.requestName.split("_")[0]
200 
201  @property
202  def crab(self):
203  """ Retuns a CrabController instance from cache or creates new
204  on on first call """
205  if self._crab is None:
206  if self.cert_info.voGroup:
207  self._crab = self.crabFunctions.CrabController(voGroup = self.cert_info.voGroup)
208  else:
209  self._crab = self.crabFunctions.CrabController()
210  return self._crab
211 
212  @property
213  def cert_info(self):
214  if not self._cert_info:
215  if not self.voms_proxy_time_left() > 0:
216  warn_msg = "No valid proxy, a default proxy without a specific"
217  warn_msg = "VOGroup will be used"
218  self.voms_proxy_create()
219  log.warning(warn_msg)
220  self._cert_info = self.crabFunctions.CertInfo()
221  return self._cert_info
222 
223  @property
225  if hasattr(self.options, "crab_config_path"):
226  return self.options.crab_config_path
227  return 'crab_%s_cfg.py' % self.crab_taskname
228 
229  @property
231  base_path = os.path.join( self.options.working_dir,self.local_path)
232  return os.path.join( base_path, self.crab_config_filename)
233 
234  @property
235  def crab_taskname(self):
236  taskname = self.options.label + "_" + self.options.workflow + "_"
237  if hasattr( self.options, "workflow_mode"):
238  taskname+= self.options.workflow_mode + "_"
239  taskname += "run_" + str(self.options.run) + "_v" + str(self.options.trial)
240  return taskname
241 
242 ## Exception for the VOMS proxy
244  pass
const uint16_t range(const Frame &aFrame)
This module extends the python configparser to create crab3 config files.
def getTerminalSize
Definition: tools.py:96
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def stdinWait
Definition: tools.py:70
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
Exception for the VOMS proxy.
Definition: CrabHelper.py:243
#define str(s)