9 import xml.etree.ElementTree
as ET
18 from httplib
import HTTPException
19 from multiprocessing
import Process, Queue
21 from CRABAPI.RawCommand
import crabCommand
22 from CRABClient.UserUtilities
import getConsoleLogLevel, setConsoleLogLevel
23 from CRABClient.ClientUtilities
import LOGLEVEL_MUTE
24 from CRABClient.ClientExceptions
import CachefileNotFoundException
35 def __init__(self, debug=0, logger = None , workingArea = None, voGroup = None, username = None):
37 setConsoleLogLevel(LOGLEVEL_MUTE)
39 if workingArea
is not None:
44 if voGroup
is not None:
48 if username
is not None:
53 if logger
is not None:
54 self.
logger = logger.getChild(
"CrabController")
57 self.
logger = logging.getLogger(
"CrabController")
61 if len(logging.getLogger().handlers) < 1 :
62 ch = logging.FileHandler(
'crabController.log', mode=
'a', encoding=
None, delay=
False)
63 ch.setLevel(logging.DEBUG)
65 formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s' )
67 ch.setFormatter(formatter)
68 self.logger.addHandler(ch)
83 self.logger.info(
"Checking if user can write to /store/user/%s on site %s with voGroup %s"%(self.
username,site , self.
voGroup) )
84 if not 'noPath' in path:
85 res = crabCommand(
'checkwrite',
'--site',site,
'--voGroup',self.
voGroup,
'--lfn', path)
87 res = crabCommand(
'checkwrite',
'--site',site,
'--voGroup',self.
voGroup)
88 if res[
'status'] ==
'SUCCESS':
89 self.logger.info(
"Checkwrite was sucessfully called.")
92 self.logger.error(
"The crab checkwrite command failed for site: %s"%site )
95 self.logger.error(
'Unable to perform crab checkwrite')
106 self.logger.info(
'Dry-run: You may check the created config and sandbox')
109 self.logger.info(
"crab sumbit called for task %s"%name)
111 self.logger.info(
str(res))
123 self.logger.info(
'Dry-run: Created config file. ')
133 self.logger.info(
"crab resumbit called for task %s"%name)
144 username = os.environ[
"CERNUSERNAME"]
147 res = crabCommand(
'checkusername')
150 return res[
'username']
162 self.logger.info(
'Dry-run: Created config file. crab command would have been: %s'%cmd)
165 if not "crab_" in name:
166 callname =
"crab_" + name
171 if 'taskFailureMsg' in res
and 'jobs' in res:
172 return res[
'status'], res[
'jobs'], res[
'taskFailureMsg']
173 elif 'jobs' in res
and 'taskFailureMsg' not in res:
174 return res[
'status'], res[
'jobs'],
None 175 elif 'jobs' not in res
and 'taskFailureMsg' in res:
176 return res[
'status'], {},res[
'taskFailureMsg']
178 return res[
'status'],{},
None 179 except Exception
as e:
181 self.logger.error(
"Can not run crab status request")
182 return "NOSTATE",{},
None 189 crabCommandProcessArgs = (self.
crab_q, crabArgs)
190 p = Process(target=crabCommandProcess, args=(crabCommandProcessArgs))
192 res = self.crab_q.get()
205 return res[
'success'], res[
'failed']
207 self.logger.error(
"Error calling crab getlog for %s" %foldername)
218 return res[
'analyzedLumis']
220 self.logger.error(
"Error calling crab report for %s" %foldername)
229 if os.path.exists(name):
232 pset =
'crab_%s_cfg.py' % name
233 with open( pset,
'r') as cfgfile: 234 cfo = imp.load_source("pycfg", pset, cfgfile )
248 dirlist = [ x
for x
in os.listdir( self.
workingArea )
if (x.startswith(
'crab_')
and os.path.isdir( os.path.join(self.
workingArea,x) ) )]
255 if name.startswith(
"crab_"):
256 crabfolder =
'%s'%name
258 crabfolder =
"crab_%s "%name
259 return crabfolder.strip()
274 (currentoptions, args ) = parser.parse_args([
" "])
288 if not hasattr(currentoptions,
'dry_run'):
289 parser.add_option(
'--dry-run', action=
'store_true', default=
False,
290 help=
'Do everything except calling CRAB or registering samples to the database.' )
291 if not hasattr(currentoptions,
'workingArea'):
292 parser.add_option(
'--workingArea',metavar=
'DIR',default=os.getcwd(),help=
'The area (full or relative path) where to create the CRAB project directory. ' 293 'If the area doesn\'t exist, CRAB will try to create it using the mkdir command' \
294 ' (without -p option). Defaults to the current working directory.' )
321 res = crabCommand(*crabCommandArgs)
323 except HTTPException
as e:
324 print "crab error ---------------" 326 print "end error ---------------" 327 print "will try again!" 330 except CachefileNotFoundException
as e:
331 print "crab error ---------------" 333 print "end error ---------------" 334 print crabCommandArgs
335 res={
'status':
"CachefileNotFound",
'jobs':{}}
338 res={
'status':
"UnexpectedError",
'jobs':{}}
344 p = subprocess.Popen(
"voms-proxy-info --fqan",
345 stdout = subprocess.PIPE,
346 stderr = subprocess.PIPE,
348 stdout, stderr = p.communicate()
350 if p.returncode != 0:
355 lines = stdout.split(
"\n")
356 splitline = lines[0].
split(
"/")
357 if len(splitline) < 4:
358 splitline = lines[1].
split(
"/")
359 self.
vo = splitline[1]
381 crabController =
None ,
383 debuglevel =
"ERROR",
397 raise ValueError(
"Either taskname or crab_config needs to be set")
398 if not os.path.exists( crab_config):
399 raise IOError(
"File %s not found" % crab_config )
400 self.
name = crab_config
401 self.
name = self.crabConfig.General.requestName
405 self.
log = logging.getLogger(
'crabTask' )
406 self.log.setLevel(logging._levelNames[ debuglevel ])
425 self.
lastUpdate = datetime.datetime.now().strftime(
"%Y-%m-%d_%H.%M.%S" )
450 test = self.crabConfig.Data.lumiMask
453 if self.name.startswith(
"Data_" ):
473 return self.crabConfig.Data.inputDataset
482 if os.path.exists( os.path.join( self.crabConfig.General.workArea, crab._prepareFoldername( self.
name ) ) ):
483 self.
_crabFolder = os.path.join( self.crabConfig.General.workArea, crab._prepareFoldername( self.
name ) )
485 alternative_path = os.path.join(os.path.cwd(), crab._prepareFoldername( self.
name ) )
486 if os.path.exists( alternative_path ):
489 self.log.error(
"Unable to find folder for Task")
498 for jobkey
in self.jobs.keys():
499 job = self.
jobs[jobkey]
500 if job[
'State'] ==
'failed':
501 failedJobIds.append( job[
'JobIds'][-1] )
502 controller.resubmit( self.
name, joblist = failedJobIds )
503 self.
lastUpdate = datetime.datetime.now().strftime(
"%Y-%m-%d_%H.%M.%S" )
507 return os.path.join( self.crabConfig.General.workArea,
508 "crab_" + self.crabConfig.General.requestName)
514 self.log.debug(
"Start update for task %s" % self.
name )
517 self.
state =
"UPDATING" 520 self.log.debug(
"Try to get status for task" )
522 self.log.debug(
"Found state: %s" % self.
state )
523 if self.
state==
"FAILED":
529 if self.
state ==
"NOSTATE":
530 self.log.debug(
"Trying to resubmit because of NOSTATE" )
535 self.
lastUpdate = datetime.datetime.now().strftime(
"%Y-%m-%d_%H.%M.%S" )
543 if "The CRAB3 server backend could not resubmit your task because the Grid scheduler answered with an error." in task.failureReason:
545 cmd =
'mv %s bak_%s' %(crab._prepareFoldername( self.
name ),crab._prepareFoldername( self.
name ))
546 p = subprocess.Popen(cmd,stdout=subprocess.PIPE, shell=
True)
547 (out,err) = p.communicate()
548 self.
state =
"SHEDERR" 549 configName =
'%s_cfg.py' %(crab._prepareFoldername( self.
name ))
550 crab.submit( configName )
552 elif task.failureReason
is not None:
553 self.
state =
"ERRHANDLE" 554 crab.resubmit( self.
name )
564 jobKeys = sorted(self.jobs.keys())
566 intJobkeys = [
int(x)
for x
in jobKeys]
568 print "error parsing job numers to int" 572 stateDict = {
'unsubmitted':0,
'idle':0,
'running':0,
'transferring':0,
'cooloff':0,
'failed':0,
'finished':0}
579 for statekey
in stateDict.keys():
580 if statekey
in job[
'State']:
581 stateDict[statekey]+=1
583 if dCacheFileList
is not None:
584 outputFilename =
"%s_%s"%( self.
name, key)
585 if 'finished' in statekey
and any(outputFilename
in s
for s
in dCacheFileList):
588 for state
in stateDict:
589 attrname =
"n" + state.capitalize()
590 setattr(self, attrname, stateDict[state])
599 JobNumber = logArchName.split(
"/")[-1].
split(
"_")[1].
split(
".")[0]
600 log = {
'readEvents' : 0}
601 with tarfile.open( logArchName,
"r") as tar: 603 JobXmlFile = tar.extractfile(
'FrameworkJobReport-%s.xml' % JobNumber)
604 root = ET.fromstring( JobXmlFile.read() )
606 if child.tag ==
'InputFile':
607 for subchild
in child:
608 if subchild.tag ==
'EventsRead':
609 nEvents =
int(subchild.text)
610 log.update({
'readEvents' : nEvents})
614 print "Can not parse / read %s" % logArchName
629 if tasklist
is not None:
643 for task
in tasklist:
644 if not task.isUpdating:
646 self.
nIdle += task.nIdle
def updateJobStats(self, dCacheFileList=None)
Function to update JobStatistics.
def __init__(self, taskname="", crab_config="", crabController=None, initUpdate=True, debuglevel="ERROR", datasetpath="", localDir="", outlfn="")
The object constructor.
def status(self, name)
Check crab status.
bool any(const std::vector< T > &v, const T &what)
def _prepareFoldername(self, name)
Add crab_ to Foldername if needed.
def __init__(self, tasklist=None)
The object constructor.
def commandlineOptions(self, parser=optparse.OptionParser( 'usage:%prog'))
Populates an existing optparse parser or returns a new one with options for crab functions.
The CrabController class.
def handleNoState(self)
Function to handle Task which received NOSTATE status.
def checkusername(self)
Returns the hn name for a user with valid proxy.
def checkwrite(self, site='T2_DE_RWTH', path='noPath')
Check if crab can write to specified site.
def submit(self, name)
Check if crab can write to specified site.
def clearStats(self)
This function sets all counts to zero.
Class for a single CrabRequest e This class represents one crab3 task/request.
def resubmit(self, name, joblist=None)
Resubmit all failed tasks in job or specified list of jobs in task.
def crabFolders(self)
Return list of all crab folders in workin area (default cwd)
def update(self)
Function to update Task in associated Jobs.
def getlog(self, name)
Call crab getlog.
def readLogArch(self, logArchName)
Function to read log info from log.tar.gz.
def resubmit_failed(self)
Function to resubmit failed jobs in tasks.
def updateStats(self, tasklist)
This function updates the statistics for a given tasklist.
def crabCommandProcess(q, crabCommandArgs)
Function to run crab command in a new process.
def __init__(self, debug=0, logger=None, workingArea=None, voGroup=None, username=None)
The constructor.
Class holds job statistics for several Crab tasks.
def callCrabCommand(self, crabArgs)
Call crab command in a new process and return result dict.
def readCrabConfig(self, name)
Read a crab config and return python object.
def report(self, name)
Call crab report command and return path to lumiSummary.
def crabConfig(self)
Function to access crab config object or read it if unititalized.
def isData(self)
Property function to find out if task runs on data.