CMS 3D CMS Logo

CrabHelper.py
Go to the documentation of this file.
1 from __future__ import print_function
2 from __future__ import absolute_import
3 import logging
4 import sys
5 import os
6 from importlib import import_module
7 import subprocess
8 import shutil
9 import time
10 from . import tools
11 
12 log = logging.getLogger(__name__)
14 
15  def __init__(self):
16  # perform imports only when creating instance. This allows to use the classmethods e.g.for
17  # CLI construction before crab is sourced.
18  self.crabFunctions = import_module('CalibMuon.DTCalibration.Workflow.Crabtools.crabFunctions')
19  # cached member variables
20  self._crab = None
21  self._cert_info = None
22 
23  def submit_crab_task(self):
24  # create a crab config
25  log.info("Creating crab config")
26  self.create_crab_config()
27  #write crab config
28  full_crab_config_filename = self.write_crabConfig()
29  if self.options.no_exec:
30  log.info("Runing with option no-exec exiting")
31  return True
32  #submit crab job
33  log.info("Submitting crab job")
34  self.crab.submit(full_crab_config_filename)
35  log.info("crab job submitted. Waiting 120 seconds before first status call")
36  time.sleep( 120 )
37 
38  task = self.crabFunctions.CrabTask(crab_config = full_crab_config_filename)
39  task.update()
40  if task.state =="UNKNOWN":
41  time.sleep( 30 )
42  task.update()
43  success_states = ( 'QUEUED', 'SUBMITTED', "COMPLETED", "FINISHED")
44  if task.state in success_states:
45  log.info("Job in state %s" % task.state )
46  return True
47  else:
48  log.error("Job submission not successful, crab state:%s" % task.state)
49  raise RuntimeError("Job submission not successful, crab state:%s" % task.state)
50 
51  def check_crabtask(self):
53  task = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
54  initUpdate = False)
55 
56  if self.options.no_exec:
57  log.info("Nothing to check in no-exec mode")
58  return True
59  for n_check in range(self.options.max_checks):
60  task.update()
61  if task.state in ( "COMPLETED"):
62  print("Crab task complete. Getting output locally")
63  output_path = os.path.join( self.local_path, "unmerged_results" )
64  self.get_output_files(task, output_path)
65  return True
66  if task.state in ("SUBMITFAILED", "FAILED"):
67  print("Crab task failed")
68  return False
69  possible_job_states = ["nUnsubmitted",
70  "nIdle",
71  "nRunning",
72  "nTransferring",
73  "nCooloff",
74  "nFailed",
75  "nFinished",
76  "nComplete" ]
77 
78  jobinfos = ""
79  for jobstate in possible_job_states:
80  njobs_in_state = getattr(task, jobstate)
81  if njobs_in_state > 0:
82  jobinfos+="%s: %d " % (jobstate[1:], njobs_in_state)
83 
84  #clear line for reuse
85  sys.stdout.write("\r")
86  sys.stdout.write("".join([" " for i in range(tools.getTerminalSize()[0])]))
87  sys.stdout.write("\r")
88  prompt_text = "Check (%d/%d). Task state: %s (%s). Press q and enter to stop checks: " % (n_check,
89  self.options.max_checks, task.state, jobinfos)
90  user_input = tools.stdinWait(prompt_text, "", self.options.check_interval)
91  if user_input in ("q","Q"):
92  return False
93  print("Task not completed after %d checks (%d minutes)" % ( self.options.max_checks,
94  int( self.options.check_interval / 60. )))
95  return False
96 
98  process = subprocess.Popen(['voms-proxy-info', '-timeleft'],
99  stdout=subprocess.PIPE)
100  stdout = process.communicate()[0]
101  if process.returncode != 0:
102  return 0
103  else:
104  return int(stdout)
105 
106  def voms_proxy_create(self, passphrase = None):
107  voms = 'cms'
108  if passphrase:
109  p = subprocess.Popen([shutil.which('voms-proxy-init'), '--voms', voms, '--valid', '192:00'],
110  executable = '/bin/bash',
111  stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
112  stdout = p.communicate(input=passphrase+'\n')[0]
113  retcode = p.returncode
114  if not retcode == 0:
115  raise ProxyError('Proxy initialization command failed: %s'%stdout)
116  else:
117  retcode = subprocess.call([shutil.which('voms-proxy-init'), '--voms', voms, '--valid', '192:00'])
118  if not retcode == 0:
119  raise ProxyError('Proxy initialization command failed.')
120 
121 
123  """ Create a crab config object dependent on the chosen command option"""
124  from CalibMuon.DTCalibration.Workflow.Crabtools.crabConfigParser import CrabConfigParser
126  """ Fill common options in crab config """
127 
128  self.crab_config.add_section('General')
129  if "/" in self.crab_taskname:
130  raise ValueError( 'Sample contains "/" which is not allowed' )
131  self.crab_config.set( 'General', 'requestName', self.crab_taskname )
132  self.crab_config.set( 'General', 'workArea', self.local_path)
133  if self.options.no_log:
134  self.crab_config.set( 'General', 'transferLogs', 'False' )
135  else:
136  self.crab_config.set( 'General', 'transferLogs', 'True' )
137 
138  self.crab_config.add_section('JobType')
139  self.crab_config.set( 'JobType', 'pluginName', 'Analysis' )
140  self.crab_config.set( 'JobType', 'psetName', self.pset_path )
141  self.crab_config.set( 'JobType', 'outputFiles', self.output_files)
142  if self.input_files:
143  self.crab_config.set( 'JobType', 'inputFiles', self.input_files)
144 
145  self.crab_config.add_section('Data')
146  self.crab_config.set('Data', 'inputDataset', self.options.datasetpath)
147  # set job splitting options
148  if self.options.datasettype =="MC":
149  self.crab_config.set('Data', 'splitting', 'FileBased')
150  self.crab_config.set('Data', 'unitsPerJob', str(self.options.filesPerJob) )
151  else:
152  self.crab_config.set('Data', 'splitting', 'LumiBased')
153  self.crab_config.set('Data', 'unitsPerJob', str(self.options.lumisPerJob) )
154  if self.options.runselection:
155  self.crab_config.set( "Data",
156  "runRange",
157  ",".join( self.options.runselection )
158  )
159  # set output path in compliance with crab3 structure
160  self.crab_config.set('Data', 'publication', False)
161  self.crab_config.set('Data', 'outputDatasetTag', self.remote_out_path["outputDatasetTag"])
162  self.crab_config.set('Data', 'outLFNDirBase', self.remote_out_path["outLFNDirBase"] )
163 
164  # set site section options
165  self.crab_config.add_section('Site')
166  self.crab_config.set('Site', 'storageSite', self.options.output_site)
167  self.crab_config.set('Site', 'whitelist', self.options.ce_white_list)
168  self.crab_config.set('Site', 'blacklist', self.options.ce_black_list)
169 
170  #set user section options if necessary
171 # if self.cert_info.voGroup or self.cert_info.voRole:
172 # self.crab_config.add_section('User')
173 # if self.cert_info.voGroup:
174 # self.crab_config.set('User', "voGroup", self.cert_info.voGroup)
175 # if self.cert_info.voRole:
176 # self.crab_config.set('User', "voRole", self.cert_info.voRole)
177  log.debug("Created crab config: %s " % self.crab_config_filename)
178 
179  def write_crabConfig(self):
180  """ Write crab config file in working dir with label option as name """
181  base_path = os.path.join( self.options.working_dir,self.local_path)
182  filename = os.path.join( base_path, self.crab_config_filename)
183  if not os.path.exists(base_path):
184  os.makedirs(base_path)
185  if os.path.exists(filename):
186  raise IOError("file %s alrady exits"%(filename))
187  self.crab_config.writeCrabConfig(filename)
188  log.info( 'created crab config file %s'%filename )
189  return filename
190 
192  crabtask = CrabTask( crab_config = self.crab_config_filename )
193  splitinfo = crabtask.crabConfig.Data.outputDatasetTag.split("_")
194  run, trial = splitinfo[0].split("Run")[-1], splitinfo[1].split("v")[-1]
195  if not self.options.run:
196  self.options.run = int(run)
197  self.options.trail = int(trial)
198  if not hasattr(self.options, "datasetpath"):
199  self.options.datasetpath = crabtask.crabConfig.Data.inputDataset
200  if not hasattr(self.options, "label"):
201  self.options.label = crabtask.crabConfig.General.requestName.split("_")[0]
202 
203  @property
204  def crab(self):
205  """ Retuns a CrabController instance from cache or creates new
206  on on first call """
207  if self._crab is None:
208  if self.cert_info.voGroup:
209  self._crab = self.crabFunctions.CrabController(voGroup = self.cert_info.voGroup)
210  else:
211  self._crab = self.crabFunctions.CrabController()
212  return self._crab
213 
214  @property
215  def cert_info(self):
216  if not self._cert_info:
217  if not self.voms_proxy_time_left() > 0:
218  warn_msg = "No valid proxy, a default proxy without a specific"
219  warn_msg = "VOGroup will be used"
220  self.voms_proxy_create()
221  log.warning(warn_msg)
222  self._cert_info = self.crabFunctions.CertInfo()
223  return self._cert_info
224 
225  @property
227  if hasattr(self.options, "crab_config_path"):
228  return self.options.crab_config_path
229  return 'crab_%s_cfg.py' % self.crab_taskname
230 
231  @property
233  base_path = os.path.join( self.options.working_dir,self.local_path)
234  return os.path.join( base_path, self.crab_config_filename)
235 
236  @property
237  def crab_taskname(self):
238  taskname = self.options.label + "_" + self.options.workflow + "_"
239  if hasattr( self.options, "workflow_mode"):
240  taskname+= self.options.workflow_mode + "_"
241  taskname += "run_" + str(self.options.run) + "_v" + str(self.options.trial)
242  return taskname
243 
244 
246  pass
def fill_options_from_crab_config(self)
Definition: CrabHelper.py:191
def create_crab_config(self)
Definition: CrabHelper.py:122
This module extends the python configparser to create crab3 config files.
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def crab_config_filepath(self)
Definition: CrabHelper.py:232
def voms_proxy_create(self, passphrase=None)
Definition: CrabHelper.py:106
def getTerminalSize()
Definition: tools.py:94
def submit_crab_task(self)
Definition: CrabHelper.py:23
static std::string join(char **cmd)
Definition: RemoteFile.cc:21
def voms_proxy_time_left(self)
Definition: CrabHelper.py:97
def write_crabConfig(self)
Definition: CrabHelper.py:179
Exception for the VOMS proxy.
Definition: CrabHelper.py:245
def stdinWait(text, default, time, timeoutDisplay=None, kwargs)
Definition: tools.py:68
def crab_taskname(self)
Definition: CrabHelper.py:237
def check_crabtask(self)
Definition: CrabHelper.py:51
#define str(s)
def crab_config_filename(self)
Definition: CrabHelper.py:226