1 from __future__
import print_function
2 from __future__
import absolute_import
6 from importlib
import import_module
11 log = logging.getLogger(__name__)
17 self.
crabFunctions = import_module(
'CalibMuon.DTCalibration.Workflow.Crabtools.crabFunctions')
24 log.info(
"Creating crab config")
28 if self.options.no_exec:
29 log.info(
"Runing with option no-exec exiting")
32 log.info(
"Submitting crab job")
33 self.
crab.submit(full_crab_config_filename)
34 log.info(
"crab job submitted. Waiting 120 seconds before first status call")
37 task = self.
crabFunctions.CrabTask(crab_config = full_crab_config_filename)
39 if task.state ==
"UNKNOWN":
42 success_states = (
'QUEUED',
'SUBMITTED',
"COMPLETED",
"FINISHED")
43 if task.state
in success_states:
44 log.info(
"Job in state %s" % task.state )
47 log.error(
"Job submission not successful, crab state:%s" % task.state)
48 raise RuntimeError(
"Job submission not successful, crab state:%s" % task.state)
54 if self.options.no_exec:
55 log.info(
"Nothing to check in no-exec mode")
57 for n_check
in range(self.options.max_checks):
59 if task.state
in (
"COMPLETED"):
60 print(
"Crab task complete. Getting output locally")
61 output_path = os.path.join( self.local_path,
"unmerged_results" )
62 self.get_output_files(task, output_path)
64 if task.state
in (
"SUBMITFAILED",
"FAILED"):
65 print(
"Crab task failed")
67 possible_job_states = [
"nUnsubmitted",
77 for jobstate
in possible_job_states:
78 njobs_in_state = getattr(task, jobstate)
79 if njobs_in_state > 0:
80 jobinfos+=
"%s: %d " % (jobstate[1:], njobs_in_state)
83 sys.stdout.write(
"\r")
85 sys.stdout.write(
"\r")
86 prompt_text =
"Check (%d/%d). Task state: %s (%s). Press q and enter to stop checks: " % (n_check,
87 self.options.max_checks, task.state, jobinfos)
88 user_input =
tools.stdinWait(prompt_text,
"", self.options.check_interval)
89 if user_input
in (
"q",
"Q"):
91 print(
"Task not completed after %d checks (%d minutes)" % ( self.options.max_checks,
92 int( self.options.check_interval / 60. )))
96 process = subprocess.Popen([
'voms-proxy-info',
'-timeleft'],
97 stdout=subprocess.PIPE,
98 stderr=subprocess.STDOUT)
99 stdout = process.communicate()[0]
100 if process.returncode != 0:
108 p = subprocess.Popen([
'voms-proxy-init',
'--voms', voms,
'--valid',
'192:00'],
109 stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
110 stdout = p.communicate(input=passphrase+
'\n')[0]
111 retcode = p.returncode
113 raise ProxyError(
'Proxy initialization command failed: %s'%stdout)
115 retcode = subprocess.call([
'voms-proxy-init',
'--voms', voms,
'--valid',
'192:00'])
117 raise ProxyError(
'Proxy initialization command failed.')
121 """ Create a crab config object dependent on the chosen command option"""
122 from CalibMuon.DTCalibration.Workflow.Crabtools.crabConfigParser
import CrabConfigParser
124 """ Fill common options in crab config """
128 raise ValueError(
'Sample contains "/" which is not allowed' )
130 self.
crab_config.set(
'General',
'workArea', self.local_path)
131 if self.options.no_log:
132 self.
crab_config.set(
'General',
'transferLogs',
'False' )
134 self.
crab_config.set(
'General',
'transferLogs',
'True' )
137 self.
crab_config.set(
'JobType',
'pluginName',
'Analysis' )
138 self.
crab_config.set(
'JobType',
'psetName', self.pset_path )
139 self.
crab_config.set(
'JobType',
'outputFiles', self.output_files)
141 self.
crab_config.set(
'JobType',
'inputFiles', self.input_files)
144 self.
crab_config.set(
'Data',
'inputDataset', self.options.datasetpath)
146 if self.options.datasettype ==
"MC":
147 self.
crab_config.set(
'Data',
'splitting',
'FileBased')
148 self.
crab_config.set(
'Data',
'unitsPerJob',
str(self.options.filesPerJob) )
150 self.
crab_config.set(
'Data',
'splitting',
'LumiBased')
151 self.
crab_config.set(
'Data',
'unitsPerJob',
str(self.options.lumisPerJob) )
152 if self.options.runselection:
155 ",".
join( self.options.runselection )
159 self.
crab_config.set(
'Data',
'outputDatasetTag', self.remote_out_path[
"outputDatasetTag"])
160 self.
crab_config.set(
'Data',
'outLFNDirBase', self.remote_out_path[
"outLFNDirBase"] )
164 self.
crab_config.set(
'Site',
'storageSite', self.options.output_site)
165 self.
crab_config.set(
'Site',
'whitelist', self.options.ce_white_list)
166 self.
crab_config.set(
'Site',
'blacklist', self.options.ce_black_list)
178 """ Write crab config file in working dir with label option as name """
179 base_path = os.path.join( self.options.working_dir,self.local_path)
181 if not os.path.exists(base_path):
182 os.makedirs(base_path)
183 if os.path.exists(filename):
184 raise IOError(
"file %s alrady exits"%(filename))
186 log.info(
'created crab config file %s'%filename )
191 splitinfo = crabtask.crabConfig.Data.outputDatasetTag.split(
"_")
192 run, trial = splitinfo[0].
split(
"Run")[-1], splitinfo[1].
split(
"v")[-1]
193 if not self.options.run:
194 self.options.run =
int(run)
195 self.options.trail =
int(trial)
196 if not hasattr(self.options,
"datasetpath"):
197 self.options.datasetpath = crabtask.crabConfig.Data.inputDataset
198 if not hasattr(self.options,
"label"):
199 self.options.label = crabtask.crabConfig.General.requestName.split(
"_")[0]
203 """ Retuns a CrabController instance from cache or creates new
205 if self.
_crab is None:
216 warn_msg =
"No valid proxy, a default proxy without a specific"
217 warn_msg =
"VOGroup will be used"
219 log.warning(warn_msg)
225 if hasattr(self.options,
"crab_config_path"):
226 return self.options.crab_config_path
231 base_path = os.path.join( self.options.working_dir,self.local_path)
236 taskname = self.options.label +
"_" + self.options.workflow +
"_"
237 if hasattr( self.options,
"workflow_mode"):
238 taskname+= self.options.workflow_mode +
"_"
239 taskname +=
"run_" +
str(self.options.run) +
"_v" +
str(self.options.trial)