7 from __future__
import print_function
10 import xml.etree.ElementTree
as ET
19 from httplib
import HTTPException
20 from multiprocessing
import Process, Queue
22 from CRABAPI.RawCommand
import crabCommand
23 from CRABClient.UserUtilities
import getConsoleLogLevel, setConsoleLogLevel
24 from CRABClient.ClientUtilities
import LOGLEVEL_MUTE
25 from CRABClient.ClientExceptions
import CachefileNotFoundException
36 def __init__(self, debug=0, logger = None , workingArea = None, voGroup = None, username = None):
38 setConsoleLogLevel(LOGLEVEL_MUTE)
40 if workingArea
is not None:
45 if voGroup
is not None:
49 if username
is not None:
54 if logger
is not None:
55 self.
logger = logger.getChild(
"CrabController")
58 self.
logger = logging.getLogger(
"CrabController")
62 if len(logging.getLogger().handlers) < 1 :
63 ch = logging.FileHandler(
'crabController.log', mode=
'a', encoding=
None, delay=
False)
64 ch.setLevel(logging.DEBUG)
66 formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s' )
68 ch.setFormatter(formatter)
84 self.
logger.
info(
"Checking if user can write to /store/user/%s on site %s with voGroup %s"%(self.
username,site , self.
voGroup) )
85 if not 'noPath' in path:
86 res = crabCommand(
'checkwrite',
'--site',site,
'--voGroup',self.
voGroup,
'--lfn', path)
88 res = crabCommand(
'checkwrite',
'--site',site,
'--voGroup',self.
voGroup)
89 if res[
'status'] ==
'SUCCESS':
90 self.
logger.
info(
"Checkwrite was sucessfully called.")
93 self.
logger.
error(
"The crab checkwrite command failed for site: %s"%site )
96 self.
logger.
error(
'Unable to perform crab checkwrite')
107 self.
logger.
info(
'Dry-run: You may check the created config and sandbox')
110 self.
logger.
info(
"crab sumbit called for task %s"%name)
124 self.
logger.
info(
'Dry-run: Created config file. ')
134 self.
logger.
info(
"crab resumbit called for task %s"%name)
145 username = os.environ[
"CERNUSERNAME"]
148 res = crabCommand(
'checkusername')
151 return res[
'username']
163 self.
logger.
info(
'Dry-run: Created config file. crab command would have been: %s'%cmd)
166 if not "crab_" in name:
167 callname =
"crab_" + name
172 if 'taskFailureMsg' in res
and 'jobs' in res:
173 return res[
'status'], res[
'jobs'], res[
'taskFailureMsg']
174 elif 'jobs' in res
and 'taskFailureMsg' not in res:
175 return res[
'status'], res[
'jobs'],
None
176 elif 'jobs' not in res
and 'taskFailureMsg' in res:
177 return res[
'status'], {},res[
'taskFailureMsg']
179 return res[
'status'],{},
None
180 except Exception
as e:
182 self.
logger.
error(
"Can not run crab status request")
183 return "NOSTATE",{},
None
190 crabCommandProcessArgs = (self.
crab_q, crabArgs)
191 p = Process(target=crabCommandProcess, args=(crabCommandProcessArgs))
206 return res[
'success'], res[
'failed']
208 self.
logger.
error(
"Error calling crab getlog for %s" %foldername)
219 return res[
'analyzedLumis']
221 self.
logger.
error(
"Error calling crab report for %s" %foldername)
230 if os.path.exists(name):
233 pset =
'crab_%s_cfg.py' % name
234 with open( pset,
'r')
as cfgfile:
235 cfo = imp.load_source(
"pycfg", pset, cfgfile )
249 dirlist = [ x
for x
in os.listdir( self.
workingArea )
if (x.startswith(
'crab_')
and os.path.isdir( os.path.join(self.
workingArea,x) ) )]
256 if name.startswith(
"crab_"):
257 crabfolder =
'%s'%name
259 crabfolder =
"crab_%s "%name
260 return crabfolder.strip()
275 (currentoptions, args ) = parser.parse_args([
" "])
289 if not hasattr(currentoptions,
'dry_run'):
290 parser.add_option(
'--dry-run', action=
'store_true', default=
False,
291 help=
'Do everything except calling CRAB or registering samples to the database.' )
292 if not hasattr(currentoptions,
'workingArea'):
293 parser.add_option(
'--workingArea',metavar=
'DIR',default=os.getcwd(),help=
'The area (full or relative path) where to create the CRAB project directory. '
294 'If the area doesn\'t exist, CRAB will try to create it using the mkdir command' \
295 ' (without -p option). Defaults to the current working directory.' )
322 res = crabCommand(*crabCommandArgs)
324 except HTTPException
as e:
325 print(
"crab error ---------------")
327 print(
"end error ---------------")
328 print(
"will try again!")
331 except CachefileNotFoundException
as e:
332 print(
"crab error ---------------")
334 print(
"end error ---------------")
335 print(crabCommandArgs)
336 res={
'status':
"CachefileNotFound",
'jobs':{}}
339 res={
'status':
"UnexpectedError",
'jobs':{}}
345 p = subprocess.Popen(
"voms-proxy-info --fqan",
346 stdout = subprocess.PIPE,
347 stderr = subprocess.PIPE,
349 stdout, stderr = p.communicate()
351 if p.returncode != 0:
356 lines = stdout.split(
"\n")
357 splitline = lines[0].
split(
"/")
358 if len(splitline) < 4:
359 splitline = lines[1].
split(
"/")
360 self.
vo = splitline[1]
382 crabController = None ,
384 debuglevel = "ERROR",
398 raise ValueError(
"Either taskname or crab_config needs to be set")
399 if not os.path.exists( crab_config):
400 raise IOError(
"File %s not found" % crab_config )
406 self.
log = logging.getLogger(
'crabTask' )
407 self.
log.setLevel(logging._levelNames[ debuglevel ])
426 self.
lastUpdate = datetime.datetime.now().strftime(
"%Y-%m-%d_%H.%M.%S" )
454 if self.
name.startswith(
"Data_" ):
483 if os.path.exists( os.path.join( self.
crabConfig.General.workArea, crab._prepareFoldername( self.
name ) ) ):
486 alternative_path = os.path.join(os.path.cwd(), crab._prepareFoldername( self.
name ) )
487 if os.path.exists( alternative_path ):
490 self.
log.
error(
"Unable to find folder for Task")
500 job = self.
jobs[jobkey]
501 if job[
'State'] ==
'failed':
502 failedJobIds.append( job[
'JobIds'][-1] )
503 controller.resubmit( self.
name, joblist = failedJobIds )
504 self.
lastUpdate = datetime.datetime.now().strftime(
"%Y-%m-%d_%H.%M.%S" )
508 return os.path.join( self.
crabConfig.General.workArea,
509 "crab_" + self.
crabConfig.General.requestName)
515 self.
log.
debug(
"Start update for task %s" % self.
name )
518 self.
state =
"UPDATING"
521 self.
log.
debug(
"Try to get status for task" )
524 if self.
state==
"FAILED":
530 if self.
state ==
"NOSTATE":
531 self.
log.
debug(
"Trying to resubmit because of NOSTATE" )
536 self.
lastUpdate = datetime.datetime.now().strftime(
"%Y-%m-%d_%H.%M.%S" )
544 if "The CRAB3 server backend could not resubmit your task because the Grid scheduler answered with an error." in task.failureReason:
546 cmd =
'mv %s bak_%s' %(crab._prepareFoldername( self.
name ),crab._prepareFoldername( self.
name ))
547 p = subprocess.Popen(cmd,stdout=subprocess.PIPE, shell=
True)
548 (out,err) = p.communicate()
549 self.
state =
"SHEDERR"
550 configName =
'%s_cfg.py' %(crab._prepareFoldername( self.
name ))
551 crab.submit( configName )
553 elif task.failureReason
is not None:
554 self.
state =
"ERRHANDLE"
555 crab.resubmit( self.
name )
567 intJobkeys = [
int(x)
for x
in jobKeys]
569 print(
"error parsing job numers to int")
573 stateDict = {
'unsubmitted':0,
'idle':0,
'running':0,
'transferring':0,
'cooloff':0,
'failed':0,
'finished':0}
580 for statekey
in stateDict.keys():
581 if statekey
in job[
'State']:
582 stateDict[statekey]+=1
584 if dCacheFileList
is not None:
585 outputFilename =
"%s_%s"%( self.
name, key)
586 if 'finished' in statekey
and any(outputFilename
in s
for s
in dCacheFileList):
589 for state
in stateDict:
590 attrname =
"n" + state.capitalize()
591 setattr(self, attrname, stateDict[state])
600 JobNumber = logArchName.split(
"/")[-1].
split(
"_")[1].
split(
".")[0]
601 log = {
'readEvents' : 0}
602 with tarfile.open( logArchName,
"r")
as tar:
604 JobXmlFile = tar.extractfile(
'FrameworkJobReport-%s.xml' % JobNumber)
605 root = ET.fromstring( JobXmlFile.read() )
607 if child.tag ==
'InputFile':
608 for subchild
in child:
609 if subchild.tag ==
'EventsRead':
610 nEvents =
int(subchild.text)
611 log.update({
'readEvents' : nEvents})
615 print(
"Can not parse / read %s" % logArchName)
630 if tasklist
is not None:
644 for task
in tasklist:
645 if not task.isUpdating:
647 self.
nIdle += task.nIdle