4 from importlib
import import_module
9 log = logging.getLogger(__name__)
15 self.
crabFunctions = import_module(
'CalibMuon.DTCalibration.Workflow.Crabtools.crabFunctions')
22 log.info(
"Creating crab config")
26 if self.options.no_exec:
27 log.info(
"Runing with option no-exec exiting")
30 log.info(
"Submitting crab job")
31 self.crab.submit(full_crab_config_filename)
32 log.info(
"crab job submitted. Waiting 30 seconds before first status call")
34 task = self.crabFunctions.CrabTask(crab_config = full_crab_config_filename)
36 success_states = (
'QUEUED',
'SUBMITTED',
"COMPLETED",
"FINISHED")
37 if task.state
in success_states:
38 log.info(
"Job in state %s" % task.state )
41 log.error(
"Job submission not successful, crab state:%s" % task.state)
42 raise RuntimeError(
"Job submission not successful, crab state:%s" % task.state)
48 if self.options.no_exec:
49 log.info(
"Nothing to check in no-exec mode")
51 for n_check
in range(self.options.max_checks):
53 if task.state
in (
"COMPLETED"):
54 print "Crab task complete. Getting output locally" 55 output_path = os.path.join( self.local_path,
"unmerged_results" )
56 self.get_output_files(task, output_path)
58 if task.state
in (
"SUBMITFAILED",
"FAILED"):
59 print "Crab task failed" 61 possible_job_states = [
"nUnsubmitted",
71 for jobstate
in possible_job_states:
72 njobs_in_state = getattr(task, jobstate)
73 if njobs_in_state > 0:
74 jobinfos+=
"%s: %d " % (jobstate[1:], njobs_in_state)
77 sys.stdout.write(
"\r")
79 sys.stdout.write(
"\r")
80 prompt_text =
"Check (%d/%d). Task state: %s (%s). Press q and enter to stop checks: " % (n_check,
81 self.options.max_checks, task.state, jobinfos)
82 user_input =
tools.stdinWait(prompt_text,
"", self.options.check_interval)
83 if user_input
in (
"q",
"Q"):
85 print "Task not completed after %d checks (%d minutes)" % ( self.options.max_checks,
86 int( self.options.check_interval / 60. ))
90 process = subprocess.Popen([
'voms-proxy-info',
'-timeleft'],
91 stdout=subprocess.PIPE,
92 stderr=subprocess.STDOUT)
93 stdout = process.communicate()[0]
94 if process.returncode != 0:
102 p = subprocess.Popen([
'voms-proxy-init',
'--voms', voms,
'--valid',
'192:00'],
103 stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
104 stdout = p.communicate(input=passphrase+
'\n')[0]
105 retcode = p.returncode
107 raise ProxyError(
'Proxy initialization command failed: %s'%stdout)
109 retcode = subprocess.call([
'voms-proxy-init',
'--voms', voms,
'--valid',
'192:00'])
111 raise ProxyError(
'Proxy initialization command failed.')
115 """ Create a crab config object dependent on the chosen command option""" 116 from CalibMuon.DTCalibration.Workflow.Crabtools.crabConfigParser
import CrabConfigParser
118 """ Fill common options in crab config """ 120 self.crab_config.add_section(
'General')
122 raise ValueError(
'Sample contains "/" which is not allowed' )
123 self.crab_config.set(
'General',
'requestName', self.
crab_taskname )
124 self.crab_config.set(
'General',
'workArea', self.local_path)
125 if self.options.no_log:
126 self.crab_config.set(
'General',
'transferLogs',
'False' )
128 self.crab_config.set(
'General',
'transferLogs',
'True' )
130 self.crab_config.add_section(
'JobType')
131 self.crab_config.set(
'JobType',
'pluginName',
'Analysis' )
132 self.crab_config.set(
'JobType',
'psetName', self.pset_path )
133 self.crab_config.set(
'JobType',
'outputFiles', self.output_files)
135 self.crab_config.set(
'JobType',
'inputFiles', self.input_files)
137 self.crab_config.add_section(
'Data')
138 self.crab_config.set(
'Data',
'inputDataset', self.options.datasetpath)
140 if self.options.datasettype ==
"MC":
141 self.crab_config.set(
'Data',
'splitting',
'FileBased')
142 self.crab_config.set(
'Data',
'unitsPerJob',
str(self.options.filesPerJob) )
144 self.crab_config.set(
'Data',
'splitting',
'LumiBased')
145 self.crab_config.set(
'Data',
'unitsPerJob',
str(self.options.lumisPerJob) )
146 if self.options.runselection:
147 self.crab_config.set(
"Data",
149 ",".
join( self.options.runselection )
152 self.crab_config.set(
'Data',
'publication',
False)
153 self.crab_config.set(
'Data',
'outputDatasetTag', self.remote_out_path[
"outputDatasetTag"])
154 self.crab_config.set(
'Data',
'outLFNDirBase', self.remote_out_path[
"outLFNDirBase"] )
157 self.crab_config.add_section(
'Site')
158 self.crab_config.set(
'Site',
'storageSite', self.options.output_site)
159 self.crab_config.set(
'Site',
'whitelist', self.options.ce_white_list)
160 self.crab_config.set(
'Site',
'blacklist', self.options.ce_black_list)
172 """ Write crab config file in working dir with label option as name """ 173 base_path = os.path.join( self.options.working_dir,self.local_path)
175 if not os.path.exists(base_path):
176 os.makedirs(base_path)
177 if os.path.exists(filename):
178 raise IOError(
"file %s alrady exits"%(filename))
179 self.crab_config.writeCrabConfig(filename)
180 log.info(
'created crab config file %s'%filename )
185 splitinfo = crabtask.crabConfig.Data.outputDatasetTag.split(
"_")
186 run, trial = splitinfo[0].
split(
"Run")[-1], splitinfo[1].
split(
"v")[-1]
187 if not self.options.run:
188 self.options.run =
int(run)
189 self.options.trail =
int(trial)
190 if not hasattr(self.options,
"datasetpath"):
191 self.options.datasetpath = crabtask.crabConfig.Data.inputDataset
192 if not hasattr(self.options,
"label"):
193 self.options.label = crabtask.crabConfig.General.requestName.split(
"_")[0]
197 """ Retuns a CrabController instance from cache or creates new 199 if self.
_crab is None:
200 if self.cert_info.voGroup:
201 self.
_crab = self.crabFunctions.CrabController(voGroup = self.cert_info.voGroup)
203 self.
_crab = self.crabFunctions.CrabController()
210 warn_msg =
"No valid proxy, a default proxy without a specific" 211 warn_msg =
"VOGroup will be used" 213 log.warning(warn_msg)
214 self.
_cert_info = self.crabFunctions.CertInfo()
219 if hasattr(self.options,
"crab_config_path"):
220 return self.options.crab_config_path
225 base_path = os.path.join( self.options.working_dir,self.local_path)
230 taskname = self.options.label +
"_" + self.options.workflow +
"_" 231 if hasattr( self.options,
"workflow_mode"):
232 taskname+= self.options.workflow_mode +
"_" 233 taskname +=
"run_" +
str(self.options.run) +
"_v" +
str(self.options.trial)
def fill_options_from_crab_config(self)
def create_crab_config(self)
This module extends the python configparser to create crab3 config files.
def crab_config_filepath(self)
def voms_proxy_create(self, passphrase=None)
def submit_crab_task(self)
static std::string join(char **cmd)
def voms_proxy_time_left(self)
def write_crabConfig(self)
def crab_config_filename(self)