7 from __future__
import print_function
10 import xml.etree.ElementTree
as ET
19 from httplib
import HTTPException
20 from multiprocessing
import Process, Queue
22 from CRABAPI.RawCommand
import crabCommand
23 from CRABClient.UserUtilities
import getConsoleLogLevel, setConsoleLogLevel
24 from CRABClient.ClientUtilities
import LOGLEVEL_MUTE
25 from CRABClient.ClientExceptions
import CachefileNotFoundException
36 def __init__(self, debug=0, logger = None , workingArea = None, voGroup = None, username = None):
38 setConsoleLogLevel(LOGLEVEL_MUTE)
40 if workingArea
is not None:
45 if voGroup
is not None:
49 if username
is not None:
54 if logger
is not None:
55 self.
logger = logger.getChild(
"CrabController")
58 self.
logger = logging.getLogger(
"CrabController")
62 if len(logging.getLogger().handlers) < 1 :
63 ch = logging.FileHandler(
'crabController.log', mode=
'a', encoding=
None, delay=
False)
64 ch.setLevel(logging.DEBUG)
66 formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s' )
68 ch.setFormatter(formatter)
69 self.logger.addHandler(ch)
84 self.logger.info(
"Checking if user can write to /store/user/%s on site %s with voGroup %s"%(self.
username,site , self.
voGroup) )
85 if not 'noPath' in path:
86 res = crabCommand(
'checkwrite',
'--site',site,
'--voGroup',self.
voGroup,
'--lfn', path)
88 res = crabCommand(
'checkwrite',
'--site',site,
'--voGroup',self.
voGroup)
89 if res[
'status'] ==
'SUCCESS':
90 self.logger.info(
"Checkwrite was sucessfully called.")
93 self.logger.error(
"The crab checkwrite command failed for site: %s"%site )
96 self.logger.error(
'Unable to perform crab checkwrite')
107 self.logger.info(
'Dry-run: You may check the created config and sandbox')
110 self.logger.info(
"crab sumbit called for task %s"%name)
112 self.logger.info(
str(res))
124 self.logger.info(
'Dry-run: Created config file. ')
134 self.logger.info(
"crab resumbit called for task %s"%name)
145 username = os.environ[
"CERNUSERNAME"]
148 res = crabCommand(
'checkusername')
151 return res[
'username']
163 self.logger.info(
'Dry-run: Created config file. crab command would have been: %s'%cmd)
166 if not "crab_" in name:
167 callname =
"crab_" + name
172 if 'taskFailureMsg' in res
and 'jobs' in res:
173 return res[
'status'], res[
'jobs'], res[
'taskFailureMsg']
174 elif 'jobs' in res
and 'taskFailureMsg' not in res:
175 return res[
'status'], res[
'jobs'],
None 176 elif 'jobs' not in res
and 'taskFailureMsg' in res:
177 return res[
'status'], {},res[
'taskFailureMsg']
179 return res[
'status'],{},
None 180 except Exception
as e:
182 self.logger.error(
"Can not run crab status request")
183 return "NOSTATE",{},
None 190 crabCommandProcessArgs = (self.
crab_q, crabArgs)
191 p = Process(target=crabCommandProcess, args=(crabCommandProcessArgs))
193 res = self.crab_q.get()
206 return res[
'success'], res[
'failed']
208 self.logger.error(
"Error calling crab getlog for %s" %foldername)
219 return res[
'analyzedLumis']
221 self.logger.error(
"Error calling crab report for %s" %foldername)
230 if os.path.exists(name):
233 pset =
'crab_%s_cfg.py' % name
234 with open( pset,
'r') as cfgfile: 235 cfo = imp.load_source("pycfg", pset, cfgfile )
249 dirlist = [ x
for x
in os.listdir( self.
workingArea )
if (x.startswith(
'crab_')
and os.path.isdir( os.path.join(self.
workingArea,x) ) )]
256 if name.startswith(
"crab_"):
257 crabfolder =
'%s'%name
259 crabfolder =
"crab_%s "%name
260 return crabfolder.strip()
275 (currentoptions, args ) = parser.parse_args([
" "])
289 if not hasattr(currentoptions,
'dry_run'):
290 parser.add_option(
'--dry-run', action=
'store_true', default=
False,
291 help=
'Do everything except calling CRAB or registering samples to the database.' )
292 if not hasattr(currentoptions,
'workingArea'):
293 parser.add_option(
'--workingArea',metavar=
'DIR',default=os.getcwd(),help=
'The area (full or relative path) where to create the CRAB project directory. ' 294 'If the area doesn\'t exist, CRAB will try to create it using the mkdir command' \
295 ' (without -p option). Defaults to the current working directory.' )
322 res = crabCommand(*crabCommandArgs)
324 except HTTPException
as e:
325 print(
"crab error ---------------")
327 print(
"end error ---------------")
328 print(
"will try again!")
331 except CachefileNotFoundException
as e:
332 print(
"crab error ---------------")
334 print(
"end error ---------------")
335 print(crabCommandArgs)
336 res={
'status':
"CachefileNotFound",
'jobs':{}}
339 res={
'status':
"UnexpectedError",
'jobs':{}}
345 p = subprocess.Popen(
"voms-proxy-info --fqan",
346 stdout = subprocess.PIPE,
347 stderr = subprocess.PIPE,
349 stdout, stderr = p.communicate()
351 if p.returncode != 0:
356 lines = stdout.split(
"\n")
357 splitline = lines[0].
split(
"/")
358 if len(splitline) < 4:
359 splitline = lines[1].
split(
"/")
360 self.
vo = splitline[1]
382 crabController =
None ,
384 debuglevel =
"ERROR",
398 raise ValueError(
"Either taskname or crab_config needs to be set")
399 if not os.path.exists( crab_config):
400 raise IOError(
"File %s not found" % crab_config )
401 self.
name = crab_config
402 self.
name = self.crabConfig.General.requestName
406 self.
log = logging.getLogger(
'crabTask' )
407 self.log.setLevel(logging._levelNames[ debuglevel ])
426 self.
lastUpdate = datetime.datetime.now().strftime(
"%Y-%m-%d_%H.%M.%S" )
451 test = self.crabConfig.Data.lumiMask
454 if self.name.startswith(
"Data_" ):
474 return self.crabConfig.Data.inputDataset
483 if os.path.exists( os.path.join( self.crabConfig.General.workArea, crab._prepareFoldername( self.
name ) ) ):
484 self.
_crabFolder = os.path.join( self.crabConfig.General.workArea, crab._prepareFoldername( self.
name ) )
486 alternative_path = os.path.join(os.path.cwd(), crab._prepareFoldername( self.
name ) )
487 if os.path.exists( alternative_path ):
490 self.log.error(
"Unable to find folder for Task")
499 for jobkey
in self.jobs.keys():
500 job = self.
jobs[jobkey]
501 if job[
'State'] ==
'failed':
502 failedJobIds.append( job[
'JobIds'][-1] )
503 controller.resubmit( self.
name, joblist = failedJobIds )
504 self.
lastUpdate = datetime.datetime.now().strftime(
"%Y-%m-%d_%H.%M.%S" )
508 return os.path.join( self.crabConfig.General.workArea,
509 "crab_" + self.crabConfig.General.requestName)
515 self.log.debug(
"Start update for task %s" % self.
name )
518 self.
state =
"UPDATING" 521 self.log.debug(
"Try to get status for task" )
523 self.log.debug(
"Found state: %s" % self.
state )
524 if self.
state==
"FAILED":
530 if self.
state ==
"NOSTATE":
531 self.log.debug(
"Trying to resubmit because of NOSTATE" )
536 self.
lastUpdate = datetime.datetime.now().strftime(
"%Y-%m-%d_%H.%M.%S" )
544 if "The CRAB3 server backend could not resubmit your task because the Grid scheduler answered with an error." in task.failureReason:
546 cmd =
'mv %s bak_%s' %(crab._prepareFoldername( self.
name ),crab._prepareFoldername( self.
name ))
547 p = subprocess.Popen(cmd,stdout=subprocess.PIPE, shell=
True)
548 (out,err) = p.communicate()
549 self.
state =
"SHEDERR" 550 configName =
'%s_cfg.py' %(crab._prepareFoldername( self.
name ))
551 crab.submit( configName )
553 elif task.failureReason
is not None:
554 self.
state =
"ERRHANDLE" 555 crab.resubmit( self.
name )
565 jobKeys = sorted(self.jobs.keys())
567 intJobkeys = [
int(x)
for x
in jobKeys]
569 print(
"error parsing job numers to int")
573 stateDict = {
'unsubmitted':0,
'idle':0,
'running':0,
'transferring':0,
'cooloff':0,
'failed':0,
'finished':0}
580 for statekey
in stateDict.keys():
581 if statekey
in job[
'State']:
582 stateDict[statekey]+=1
584 if dCacheFileList
is not None:
585 outputFilename =
"%s_%s"%( self.
name, key)
586 if 'finished' in statekey
and any(outputFilename
in s
for s
in dCacheFileList):
589 for state
in stateDict:
590 attrname =
"n" + state.capitalize()
591 setattr(self, attrname, stateDict[state])
600 JobNumber = logArchName.split(
"/")[-1].
split(
"_")[1].
split(
".")[0]
601 log = {
'readEvents' : 0}
602 with tarfile.open( logArchName,
"r") as tar: 604 JobXmlFile = tar.extractfile(
'FrameworkJobReport-%s.xml' % JobNumber)
605 root = ET.fromstring( JobXmlFile.read() )
607 if child.tag ==
'InputFile':
608 for subchild
in child:
609 if subchild.tag ==
'EventsRead':
610 nEvents =
int(subchild.text)
611 log.update({
'readEvents' : nEvents})
615 print(
"Can not parse / read %s" % logArchName)
630 if tasklist
is not None:
644 for task
in tasklist:
645 if not task.isUpdating:
647 self.
nIdle += task.nIdle
def updateJobStats(self, dCacheFileList=None)
Function to update JobStatistics.
std::vector< std::string_view > split(std::string_view, const char *)
def __init__(self, taskname="", crab_config="", crabController=None, initUpdate=True, debuglevel="ERROR", datasetpath="", localDir="", outlfn="")
The object constructor.
def status(self, name)
Check crab status.
bool any(const std::vector< T > &v, const T &what)
def _prepareFoldername(self, name)
Add crab_ to Foldername if needed.
def __init__(self, tasklist=None)
The object constructor.
def commandlineOptions(self, parser=optparse.OptionParser( 'usage:%prog'))
Populates an existing optparse parser or returns a new one with options for crab functions.
The CrabController class.
def handleNoState(self)
Function to handle Task which received NOSTATE status.
S & print(S &os, JobReport::InputFile const &f)
def checkusername(self)
Returns the hn name for a user with valid proxy.
def checkwrite(self, site='T2_DE_RWTH', path='noPath')
Check if crab can write to specified site.
def submit(self, name)
Check if crab can write to specified site.
def clearStats(self)
This function sets all counts to zero.
Class for a single CrabRequest e This class represents one crab3 task/request.
def resubmit(self, name, joblist=None)
Resubmit all failed tasks in job or specified list of jobs in task.
def crabFolders(self)
Return list of all crab folders in workin area (default cwd)
def update(self)
Function to update Task in associated Jobs.
def getlog(self, name)
Call crab getlog.
def readLogArch(self, logArchName)
Function to read log info from log.tar.gz.
def resubmit_failed(self)
Function to resubmit failed jobs in tasks.
def updateStats(self, tasklist)
This function updates the statistics for a given tasklist.
def crabCommandProcess(q, crabCommandArgs)
Function to run crab command in a new process.
def __init__(self, debug=0, logger=None, workingArea=None, voGroup=None, username=None)
The constructor.
Class holds job statistics for several Crab tasks.
def callCrabCommand(self, crabArgs)
Call crab command in a new process and return result dict.
def readCrabConfig(self, name)
Read a crab config and return python object.
def report(self, name)
Call crab report command and return path to lumiSummary.
def crabConfig(self)
Function to access crab config object or read it if unititalized.
def isData(self)
Property function to find out if task runs on data.