CMS 3D CMS Logo

crabFunctions.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
7 from __future__ import print_function
8 import os,sys,glob
9 import tarfile
10 import xml.etree.ElementTree as ET
11 import imp
12 import json
13 import optparse
14 import subprocess
15 import logging
16 import datetime
17 import uuid
18 import time
19 from http.client import HTTPException
20 from multiprocessing import Process, Queue
21 
22 from CRABAPI.RawCommand import crabCommand
23 from CRABClient.UserUtilities import getConsoleLogLevel, setConsoleLogLevel
24 from CRABClient.ClientUtilities import LOGLEVEL_MUTE
25 from CRABClient.ClientExceptions import CachefileNotFoundException
26 
27 
30 
32 
33 
36  def __init__(self, debug=0, logger = None , workingArea = None, voGroup = None, username = None):
37 
38  setConsoleLogLevel(LOGLEVEL_MUTE)
39  self.debug = debug
40  if workingArea is not None:
41  self.workingArea = workingArea
42  else:
43  self.workingArea = os.getcwd()
44  self.dry_run = False
45  if voGroup is not None:
46  self.voGroup = voGroup
47  else:
48  self.voGroup = "dcms"
49  if username is not None:
50  self.username = username
51  else:
52  self.username = None
53 
54  if logger is not None:
55  self.logger = logger.getChild("CrabController")
56  else:
57  # add instance logger as logger to root
58  self.logger = logging.getLogger("CrabController")
59  # check if handlers are present for root logger
60  # we assume that the default logging is not configured
61  # if handler is present
62  if len(logging.getLogger().handlers) < 1 :
63  ch = logging.FileHandler('crabController.log', mode='a', encoding=None, delay=False)
64  ch.setLevel(logging.DEBUG)
65  # create formatter
66  formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )
67  # add formatter to ch
68  ch.setFormatter(formatter)
69  self.logger.addHandler(ch)
70 
71  self.crab_q = Queue()
72 
81  def checkwrite(self,site='T2_DE_RWTH',path='noPath'):
82  if self.username is None: self.checkusername()
83  try:
84  self.logger.info( "Checking if user can write to /store/user/%s on site %s with voGroup %s"%(self.username,site , self.voGroup) )
85  if not 'noPath' in path:
86  res = crabCommand('checkwrite','--site',site,'--voGroup',self.voGroup,'--lfn', path)
87  else:
88  res = crabCommand('checkwrite','--site',site,'--voGroup',self.voGroup)
89  if res['status'] == 'SUCCESS':
90  self.logger.info("Checkwrite was sucessfully called.")
91  return True
92  else:
93  self.logger.error( "The crab checkwrite command failed for site: %s"%site )
94  return False
95  except:
96  self.logger.error( 'Unable to perform crab checkwrite')
97  return False
98 
99 
104  def submit(self,name):
105  if self.dry_run:
106  res = self.callCrabCommand(('submit', '--dryrun', name))
107  self.logger.info('Dry-run: You may check the created config and sandbox')
108  else:
109  res = self.callCrabCommand(('submit','--wait' , name))
110  self.logger.info("crab sumbit called for task %s"%name)
111  if self.debug > 1:
112  self.logger.info(str(res))
113  return res
114 
122  def resubmit(self,name,joblist = None):
123  if self.dry_run:
124  self.logger.info('Dry-run: Created config file. ')
125  return {}
126  #~ if joblist is not None:
127  #~ jobstring ="%s"%','.join(joblist)
128  #~ cmd = ('resubmit','--wait', '--jobids=',jobstring, os.path.join(self.workingArea,self._prepareFoldername(name)) )
129  if False:
130  pass
131  else:
132  cmd = ('resubmit','--wait', os.path.join(self.workingArea,self._prepareFoldername(name)) )
133  res = self.callCrabCommand( cmd )
134  self.logger.info("crab resumbit called for task %s"%name)
135  return res
136 
141  def checkusername(self):
142  #depreceated string: cmd = 'crab checkHNname --voGroup=dcms'
143  #~ cmd = 'crab checkusername --voGroup=dcms'
144  try:
145  username = os.environ["CERNUSERNAME"]
146  return username
147  except:pass
148  res = crabCommand('checkusername')
149  try:
150  self.username = res['username']
151  return res['username']
152  except:
153  return "noHNname"
154 
155 
161  def status(self,name):
162  if self.dry_run:
163  self.logger.info('Dry-run: Created config file. crab command would have been: %s'%cmd)
164  else:
165  try:
166  if not "crab_" in name:
167  callname = "crab_" + name
168  else:
169  callname = name
170  res = self.callCrabCommand( ('status', '--long', callname) )
171  if 'taskFailureMsg' in res and 'jobs' in res:
172  return res['status'], res['jobs'], res['taskFailureMsg']
173  elif 'jobs' in res and 'taskFailureMsg' not in res:
174  return res['status'], res['jobs'],None
175  elif 'jobs' not in res and 'taskFailureMsg' in res:
176  return res['status'], {},res['taskFailureMsg']
177  else:
178  return res['status'],{},None
179  except Exception as e:
180  print(e)
181  self.logger.error("Can not run crab status request")
182  return "NOSTATE",{},None
183 
184 
188  def callCrabCommand( self, crabArgs ):
189  crabCommandProcessArgs = (self.crab_q, crabArgs)
190  p = Process(target=crabCommandProcess, args=(crabCommandProcessArgs))
191  p.start()
192  res = self.crab_q.get()
193  p.join()
194  return res
195 
196 
200  def getlog(self, name):
201  foldername = self._prepareFoldername( name)
202  try:
203  #res = crabCommand('--quiet','status', dir = 'crab_%s' % name)
204  res = self.callCrabCommand( ('getlog', '%s' % foldername) )
205  return res['success'], res['failed']
206  except:
207  self.logger.error("Error calling crab getlog for %s" %foldername)
208  return {}, {}
209 
210 
214  def report(self, name):
215  foldername = self._prepareFoldername( name)
216  try:
217  res = self.callCrabCommand( ('report', '%s' % foldername) )
218  return res['analyzedLumis']
219  except:
220  self.logger.error("Error calling crab report for %s" %foldername)
221 
222 
223 
227  def readCrabConfig( self, name ):
228  try:
229  if os.path.exists(name):
230  pset = name
231  else:
232  pset = 'crab_%s_cfg.py' % name
233  with open( pset, 'r') as cfgfile:
234  cfo = imp.load_source("pycfg", pset, cfgfile )
235  config = cfo.config
236  del cfo
237  return config
238  except:
239  return False
240 
241 
245  @property
246  def crabFolders(self):
247  results = []
248  dirlist = [ x for x in os.listdir( self.workingArea ) if (x.startswith('crab_') and os.path.isdir( os.path.join(self.workingArea,x) ) )]
249  return dirlist
250 
251 
254  def _prepareFoldername(self, name):
255  if name.startswith("crab_"):
256  crabfolder = '%s'%name
257  else:
258  crabfolder = "crab_%s "%name
259  return crabfolder.strip()
260 
271  def commandlineOptions(self,parser = optparse.OptionParser( 'usage: %prog' )):
272  # we first need to call parse_args with a dummy string at the beginning to
273  # check for existing options later
274  (currentoptions, args ) = parser.parse_args([" "])
275 
276  # The following block shows how variables should be added, where
277  # conflicts are possible if the option is overridden by this function
278  # they raise a value error
279  #try:
280  # parser.add_option( '--someOption',metavar='DIR',default=None,
281  # help='Dummy option for future integration')
282  #except OptionConflictError as e:
283  # conditionalLog(crablog,"There are conflicts extending the optparse options object",'error')
284  # conditionalLog(crablog,e.strerror,'error')
285 
286  # options where it is checked if they exists and new options are added
287  # otherwise
288  if not hasattr(currentoptions, 'dry_run'):
289  parser.add_option( '--dry-run', action='store_true', default=False,
290  help='Do everything except calling CRAB or registering samples to the database.' )
291  if not hasattr(currentoptions, 'workingArea'):
292  parser.add_option( '--workingArea',metavar='DIR',default=os.getcwd(),help='The area (full or relative path) where to create the CRAB project directory. '
293  'If the area doesn\'t exist, CRAB will try to create it using the mkdir command' \
294  ' (without -p option). Defaults to the current working directory.' )
295 
296 
297  # Some options can be added without expected trouble with other parser
298  # parts, simply because it is quite fixed what they represent.
299  # those variables should be added here and will throw no exception if
300  # they already exist in the parser
301  #parser.set_conflict_handler('resolve')
302  #parser.add_option( '--someOption',metavar='DIR',default=None,
303  # help='Dummy option for future integration')
304 
305  return parser
306 
307 
308 
309 
315 def crabCommandProcess(q,crabCommandArgs):
316  # give crab3 the chance for one server glitch
317  i=0
318  while True:
319  i+=1
320  try:
321  res = crabCommand(*crabCommandArgs)
322  break
323  except HTTPException as e:
324  print("crab error ---------------")
325  print(e)
326  print("end error ---------------")
327  print("will try again!")
328  import time
329  time.sleep(5)
330  except CachefileNotFoundException as e:
331  print("crab error ---------------")
332  print(e)
333  print("end error ---------------")
334  print(crabCommandArgs)
335  res={ 'status':"CachefileNotFound",'jobs':{}}
336  break
337  if i>5:
338  res={ 'status':"UnexpectedError",'jobs':{}}
339  break
340  q.put( res )
341 
342 class CertInfo:
343  def __init__( self ):
344  p = subprocess.Popen("voms-proxy-info --fqan",
345  stdout = subprocess.PIPE,
346  stderr = subprocess.PIPE,
347  shell=True)
348  stdout, stderr = p.communicate()
349  print(stdout)
350  if p.returncode != 0:
351  self.vo = ""
352  self.voGroup = ""
353  self.voRole = ""
354  else:
355  lines = stdout.split(b"\n")
356  splitline = lines[0].split(b"/")
357  if len(splitline) < 4:
358  splitline = lines[1].split(b"/")
359  self.vo = splitline[1]
360  self.voGroup = splitline[2]
361  try:
362  self.voRole = splitline[2].split("=")[1]
363  if "NULL" in self.voRole:
364  self.voGroup = ""
365  except:
366  self.voRole = ""
367 
368 
371 class CrabTask:
372 
373 
378  def __init__(self,
379  taskname="",
380  crab_config="",
381  crabController = None ,
382  initUpdate = True,
383  debuglevel = "ERROR",
384  datasetpath = "",
385  localDir = "",
386  outlfn = "" ,):
387 
388  # crab config as a python object should only be used via .config
389  self._crabConfig = None
390 
391  self._crabFolder = None
392 
393  if taskname:
394  self.name = taskname
395  else:
396  if not crab_config:
397  raise ValueError("Either taskname or crab_config needs to be set")
398  if not os.path.exists( crab_config):
399  raise IOError("File %s not found" % crab_config )
400  self.name = crab_config
401  self.name = self.crabConfig.General.requestName
402  self.uuid = uuid.uuid4()
403  #~ self.lock = multiprocessing.Lock()
404  #setup logging
405  self.log = logging.getLogger( 'crabTask' )
406  self.log.setLevel(logging.getLevelName(debuglevel))
407  self.jobs = {}
408  self.localDir = localDir
409  self.outlfn = outlfn
410  self.isUpdating = False
411  self.taskId = -1
412  #variables for statistics
413  self.nJobs = 0
414  self.state = "NOSTATE"
415  self.maxjobnumber = 0
416  self.nUnsubmitted = 0
417  self.nIdle = 0
418  self.nRunning = 0
419  self.nTransferring = 0
420  self.nCooloff = 0
421  self.nFailed = 0
422  self.nFinished = 0
423  self.nComplete = 0
424  self.failureReason = None
425  self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
426 
427  self._isData = None
428  self.resubmitCount = 0
429 
430  self.debug = False
431 
432  self.finalFiles = []
433  self.totalEvents = 0
434 
435 
436  self._datasetpath_default = datasetpath
437 
438  #start with first updates
439  if initUpdate:
440  self.update()
441  self.updateJobStats()
442 
443 
446  @property
447  def isData( self ):
448  if self._isData is None:
449  try:
450  test = self.crabConfig.Data.lumiMask
451  self._isData = True
452  except:
453  if self.name.startswith( "Data_" ):
454  self._isData = True
455  else:
456  self._isData = False
457  return self._isData
458 
459 
460 
463  @property
464  def crabConfig( self ):
465  if self._crabConfig is None:
466  crab = CrabController()
467  self._crabConfig = crab.readCrabConfig( self.name )
468  return self._crabConfig
469 
470  @property
471  def datasetpath( self ):
472  try:
473  return self.crabConfig.Data.inputDataset
474  except:
475  pass
476  return self._datasetpath_default
477 
478  @property
479  def crabFolder( self ):
480  if not self._crabFolder is None: return self._crabFolder
481  crab = CrabController()
482  if os.path.exists( os.path.join( self.crabConfig.General.workArea, crab._prepareFoldername( self.name ) ) ):
483  self._crabFolder = os.path.join( self.crabConfig.General.workArea, crab._prepareFoldername( self.name ) )
484  return self._crabFolder
485  alternative_path = os.path.join(os.path.cwd(), crab._prepareFoldername( self.name ) )
486  if os.path.exists( alternative_path ):
487  self._crabFolder = alternative_path
488  return self._crabFolder
489  self.log.error( "Unable to find folder for Task")
490  return ""
491 
492 
495  def resubmit_failed( self ):
496  failedJobIds = []
497  controller = CrabController()
498  for jobkey in self.jobs.keys():
499  job = self.jobs[jobkey]
500  if job['State'] == 'failed':
501  failedJobIds.append( job['JobIds'][-1] )
502  controller.resubmit( self.name, joblist = failedJobIds )
503  self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
504 
505  @property
506  def crab_folder(self):
507  return os.path.join( self.crabConfig.General.workArea,
508  "crab_" + self.crabConfig.General.requestName)
509 
512  def update(self):
513  #~ self.lock.acquire()
514  self.log.debug( "Start update for task %s" % self.name )
515  self.isUpdating = True
516  controller = CrabController()
517  self.state = "UPDATING"
518  # check if we should drop this sample due to missing info
519 
520  self.log.debug( "Try to get status for task" )
521  self.state , self.jobs,self.failureReason = controller.status(self.crab_folder)
522  self.log.debug( "Found state: %s" % self.state )
523  if self.state=="FAILED":
524  #try it once more
525  time.sleep(2)
526  self.state , self.jobs,self.failureReason = controller.status(self.crab_folder)
527  self.nJobs = len(self.jobs)
528  self.updateJobStats()
529  if self.state == "NOSTATE":
530  self.log.debug( "Trying to resubmit because of NOSTATE" )
531  if self.resubmitCount < 3: self.self.handleNoState()
532  # add to db if not
533  # Final solution inf state not yet found
534  self.isUpdating = False
535  self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
536  #~ self.lock.release()
537 
538 
541  def handleNoState( self ):
542  crab = CrabController()
543  if "The CRAB3 server backend could not resubmit your task because the Grid scheduler answered with an error." in task.failureReason:
544  # move folder and try it again
545  cmd = 'mv %s bak_%s' %(crab._prepareFoldername( self.name ),crab._prepareFoldername( self.name ))
546  p = subprocess.Popen(cmd,stdout=subprocess.PIPE, shell=True)#,shell=True,universal_newlines=True)
547  (out,err) = p.communicate()
548  self.state = "SHEDERR"
549  configName = '%s_cfg.py' %(crab._prepareFoldername( self.name ))
550  crab.submit( configName )
551 
552  elif task.failureReason is not None:
553  self.state = "ERRHANDLE"
554  crab.resubmit( self.name )
555  self.resubmitCount += 1
556 
557  def test_print(self):
558  return self.uuid
559 
563  def updateJobStats(self,dCacheFileList = None):
564  jobKeys = sorted(self.jobs.keys())
565  try:
566  intJobkeys = [int(x) for x in jobKeys]
567  except:
568  print("error parsing job numers to int")
569 
570  #maxjobnumber = max(intJobkeys)
571 
572  stateDict = {'unsubmitted':0,'idle':0,'running':0,'transferring':0,'cooloff':0,'failed':0,'finished':0}
573  nComplete = 0
574 
575  # loop through jobs
576  for key in jobKeys:
577  job = self.jobs[key]
578  #check if all completed files are on decache
579  for statekey in stateDict.keys():
580  if statekey in job['State']:
581  stateDict[statekey]+=1
582  # check if finished fails are found on dCache if dCacheFilelist is given
583  if dCacheFileList is not None:
584  outputFilename = "%s_%s"%( self.name, key)
585  if 'finished' in statekey and any(outputFilename in s for s in dCacheFileList):
586  nComplete +=1
587 
588  for state in stateDict:
589  attrname = "n" + state.capitalize()
590  setattr(self, attrname, stateDict[state])
591  self.nComplete = nComplete
592 
593 
598  def readLogArch(self, logArchName):
599  JobNumber = logArchName.split("/")[-1].split("_")[1].split(".")[0]
600  log = {'readEvents' : 0}
601  with tarfile.open( logArchName, "r") as tar:
602  try:
603  JobXmlFile = tar.extractfile('FrameworkJobReport-%s.xml' % JobNumber)
604  root = ET.fromstring( JobXmlFile.read() )
605  for child in root:
606  if child.tag == 'InputFile':
607  for subchild in child:
608  if subchild.tag == 'EventsRead':
609  nEvents = int(subchild.text)
610  log.update({'readEvents' : nEvents})
611  break
612  break
613  except:
614  print("Can not parse / read %s" % logArchName)
615  return log
616 
617 
620 class TaskStats:
621 
622 
628  def __init__(self, tasklist = None):
629  if tasklist is not None:
630  self.updateStats(tasklist)
631  else:
632  self.clearStats()
633 
634 
640  def updateStats(self,tasklist):
641  self.clearStats()
642  self.nTasks = len(tasklist)
643  for task in tasklist:
644  if not task.isUpdating:
645  self.nUnsubmitted += task.nUnsubmitted
646  self.nIdle += task.nIdle
647  self.nRunning += task.nRunning
648  self.nTransferring += task.nTransferring
649  self.nCooloff += task.nCooloff
650  self.nFailed += task.nFailed
651  self.nFinished += task.nFinished
652  self.nComplete += task.nComplete
653 
654 
658  def clearStats(self):
659  self.nTasks = 0
660  self.nUnsubmitted = 0
661  self.nIdle = 0
662  self.nRunning = 0
663  self.nTransferring = 0
664  self.nCooloff = 0
665  self.nFailed = 0
666  self.nFinished = 0
667  self.nComplete = 0
def updateJobStats(self, dCacheFileList=None)
Function to update JobStatistics.
def __init__(self, taskname="", crab_config="", crabController=None, initUpdate=True, debuglevel="ERROR", datasetpath="", localDir="", outlfn="")
The object constructor.
static const TGPicture * info(bool iBackgroundIsBlack)
def status(self, name)
Check crab status.
bool any(const std::vector< T > &v, const T &what)
Definition: ECalSD.cc:36
def _prepareFoldername(self, name)
Add crab_ to Foldername if needed.
def __init__(self, tasklist=None)
The object constructor.
def commandlineOptions(self, parser=optparse.OptionParser( 'usage:%prog'))
Populates an existing optparse parser or returns a new one with options for crab functions.
The CrabController class.
def handleNoState(self)
Function to handle Task which received NOSTATE status.
def checkusername(self)
Returns the hn name for a user with valid proxy.
def checkwrite(self, site='T2_DE_RWTH', path='noPath')
Check if crab can write to specified site.
def submit(self, name)
Check if crab can write to specified site.
def clearStats(self)
This function sets all counts to zero.
Class for a single CrabRequest e This class represents one crab3 task/request.
def resubmit(self, name, joblist=None)
Resubmit all failed tasks in job or specified list of jobs in task.
def crabFolders(self)
Return list of all crab folders in workin area (default cwd)
def update(self)
Function to update Task in associated Jobs.
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def getlog(self, name)
Call crab getlog.
def readLogArch(self, logArchName)
Function to read log info from log.tar.gz.
def resubmit_failed(self)
Function to resubmit failed jobs in tasks.
def updateStats(self, tasklist)
This function updates the statistics for a given tasklist.
def crabCommandProcess(q, crabCommandArgs)
Function to run crab command in a new process.
def __init__(self, debug=0, logger=None, workingArea=None, voGroup=None, username=None)
The constructor.
Class holds job statistics for several Crab tasks.
def callCrabCommand(self, crabArgs)
Call crab command in a new process and return result dict.
def readCrabConfig(self, name)
Read a crab config and return python object.
def report(self, name)
Call crab report command and return path to lumiSummary.
#define str(s)
def crabConfig(self)
Function to access crab config object or read it if unititalized.
def isData(self)
Property function to find out if task runs on data.