CMS 3D CMS Logo

crabFunctions.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
7 from __future__ import print_function
8 import os,sys,glob
9 import tarfile
10 import xml.etree.ElementTree as ET
11 import imp
12 import json
13 import optparse
14 import subprocess
15 import logging
16 import datetime
17 import uuid
18 import time
19 from httplib import HTTPException
20 from multiprocessing import Process, Queue
21 
22 from CRABAPI.RawCommand import crabCommand
23 from CRABClient.UserUtilities import getConsoleLogLevel, setConsoleLogLevel
24 from CRABClient.ClientUtilities import LOGLEVEL_MUTE
25 from CRABClient.ClientExceptions import CachefileNotFoundException
26 
27 
30 
32 
33 
36  def __init__(self, debug=0, logger = None , workingArea = None, voGroup = None, username = None):
37 
38  setConsoleLogLevel(LOGLEVEL_MUTE)
39  self.debug = debug
40  if workingArea is not None:
41  self.workingArea = workingArea
42  else:
43  self.workingArea = os.getcwd()
44  self.dry_run = False
45  if voGroup is not None:
46  self.voGroup = voGroup
47  else:
48  self.voGroup = "dcms"
49  if username is not None:
50  self.username = username
51  else:
52  self.username = None
53 
54  if logger is not None:
55  self.logger = logger.getChild("CrabController")
56  else:
57  # add instance logger as logger to root
58  self.logger = logging.getLogger("CrabController")
59  # check if handlers are present for root logger
60  # we assume that the default logging is not configured
61  # if handler is present
62  if len(logging.getLogger().handlers) < 1 :
63  ch = logging.FileHandler('crabController.log', mode='a', encoding=None, delay=False)
64  ch.setLevel(logging.DEBUG)
65  # create formatter
66  formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )
67  # add formatter to ch
68  ch.setFormatter(formatter)
69  self.logger.addHandler(ch)
70 
71  self.crab_q = Queue()
72 
81  def checkwrite(self,site='T2_DE_RWTH',path='noPath'):
82  if self.username is None: self.checkusername()
83  try:
84  self.logger.info( "Checking if user can write to /store/user/%s on site %s with voGroup %s"%(self.username,site , self.voGroup) )
85  if not 'noPath' in path:
86  res = crabCommand('checkwrite','--site',site,'--voGroup',self.voGroup,'--lfn', path)
87  else:
88  res = crabCommand('checkwrite','--site',site,'--voGroup',self.voGroup)
89  if res['status'] == 'SUCCESS':
90  self.logger.info("Checkwrite was sucessfully called.")
91  return True
92  else:
93  self.logger.error( "The crab checkwrite command failed for site: %s"%site )
94  return False
95  except:
96  self.logger.error( 'Unable to perform crab checkwrite')
97  return False
98 
99 
104  def submit(self,name):
105  if self.dry_run:
106  res = self.callCrabCommand(('submit', '--dryrun', name))
107  self.logger.info('Dry-run: You may check the created config and sandbox')
108  else:
109  res = self.callCrabCommand(('submit','--wait' , name))
110  self.logger.info("crab sumbit called for task %s"%name)
111  if self.debug > 1:
112  self.logger.info(str(res))
113  return res
114 
122  def resubmit(self,name,joblist = None):
123  if self.dry_run:
124  self.logger.info('Dry-run: Created config file. ')
125  return {}
126  #~ if joblist is not None:
127  #~ jobstring ="%s"%','.join(joblist)
128  #~ cmd = ('resubmit','--wait', '--jobids=',jobstring, os.path.join(self.workingArea,self._prepareFoldername(name)) )
129  if False:
130  pass
131  else:
132  cmd = ('resubmit','--wait', os.path.join(self.workingArea,self._prepareFoldername(name)) )
133  res = self.callCrabCommand( cmd )
134  self.logger.info("crab resumbit called for task %s"%name)
135  return res
136 
141  def checkusername(self):
142  #depreceated string: cmd = 'crab checkHNname --voGroup=dcms'
143  #~ cmd = 'crab checkusername --voGroup=dcms'
144  try:
145  username = os.environ["CERNUSERNAME"]
146  return username
147  except:pass
148  res = crabCommand('checkusername')
149  try:
150  self.username = res['username']
151  return res['username']
152  except:
153  return "noHNname"
154 
155 
161  def status(self,name):
162  if self.dry_run:
163  self.logger.info('Dry-run: Created config file. crab command would have been: %s'%cmd)
164  else:
165  try:
166  if not "crab_" in name:
167  callname = "crab_" + name
168  else:
169  callname = name
170  res = self.callCrabCommand( ('status', '--long', callname) )
171  #print res
172  if 'taskFailureMsg' in res and 'jobs' in res:
173  return res['status'], res['jobs'], res['taskFailureMsg']
174  elif 'jobs' in res and 'taskFailureMsg' not in res:
175  return res['status'], res['jobs'],None
176  elif 'jobs' not in res and 'taskFailureMsg' in res:
177  return res['status'], {},res['taskFailureMsg']
178  else:
179  return res['status'],{},None
180  except Exception as e:
181  print(e)
182  self.logger.error("Can not run crab status request")
183  return "NOSTATE",{},None
184 
185 
189  def callCrabCommand( self, crabArgs ):
190  crabCommandProcessArgs = (self.crab_q, crabArgs)
191  p = Process(target=crabCommandProcess, args=(crabCommandProcessArgs))
192  p.start()
193  res = self.crab_q.get()
194  p.join()
195  return res
196 
197 
201  def getlog(self, name):
202  foldername = self._prepareFoldername( name)
203  try:
204  #res = crabCommand('--quiet','status', dir = 'crab_%s' % name)
205  res = self.callCrabCommand( ('getlog', '%s' % foldername) )
206  return res['success'], res['failed']
207  except:
208  self.logger.error("Error calling crab getlog for %s" %foldername)
209  return {}, {}
210 
211 
215  def report(self, name):
216  foldername = self._prepareFoldername( name)
217  try:
218  res = self.callCrabCommand( ('report', '%s' % foldername) )
219  return res['analyzedLumis']
220  except:
221  self.logger.error("Error calling crab report for %s" %foldername)
222 
223 
224 
228  def readCrabConfig( self, name ):
229  try:
230  if os.path.exists(name):
231  pset = name
232  else:
233  pset = 'crab_%s_cfg.py' % name
234  with open( pset, 'r') as cfgfile:
235  cfo = imp.load_source("pycfg", pset, cfgfile )
236  config = cfo.config
237  del cfo
238  return config
239  except:
240  return False
241 
242 
246  @property
247  def crabFolders(self):
248  results = []
249  dirlist = [ x for x in os.listdir( self.workingArea ) if (x.startswith('crab_') and os.path.isdir( os.path.join(self.workingArea,x) ) )]
250  return dirlist
251 
252 
255  def _prepareFoldername(self, name):
256  if name.startswith("crab_"):
257  crabfolder = '%s'%name
258  else:
259  crabfolder = "crab_%s "%name
260  return crabfolder.strip()
261 
272  def commandlineOptions(self,parser = optparse.OptionParser( 'usage: %prog' )):
273  # we first need to call parse_args with a dummy string at the beginning to
274  # check for existing options later
275  (currentoptions, args ) = parser.parse_args([" "])
276 
277  # The following block shows how variables should be added, where
278  # conflicts are possible if the option is overridden by this function
279  # they raise a value error
280  #try:
281  # parser.add_option( '--someOption',metavar='DIR',default=None,
282  # help='Dummy option for future integration')
283  #except OptionConflictError as e:
284  # conditionalLog(crablog,"There are conflicts extending the optparse options object",'error')
285  # conditionalLog(crablog,e.strerror,'error')
286 
287  # options where it is checked if they exists and new options are added
288  # otherwise
289  if not hasattr(currentoptions, 'dry_run'):
290  parser.add_option( '--dry-run', action='store_true', default=False,
291  help='Do everything except calling CRAB or registering samples to the database.' )
292  if not hasattr(currentoptions, 'workingArea'):
293  parser.add_option( '--workingArea',metavar='DIR',default=os.getcwd(),help='The area (full or relative path) where to create the CRAB project directory. '
294  'If the area doesn\'t exist, CRAB will try to create it using the mkdir command' \
295  ' (without -p option). Defaults to the current working directory.' )
296 
297 
298  # Some options can be added without expected trouble with other parser
299  # parts, simply because it is quite fixed what they represent.
300  # those variables should be added here and will throw no exception if
301  # they already exist in the parser
302  #parser.set_conflict_handler('resolve')
303  #parser.add_option( '--someOption',metavar='DIR',default=None,
304  # help='Dummy option for future integration')
305 
306  return parser
307 
308 
309 
310 
316 def crabCommandProcess(q,crabCommandArgs):
317  # give crab3 the chance for one server glitch
318  i=0
319  while True:
320  i+=1
321  try:
322  res = crabCommand(*crabCommandArgs)
323  break
324  except HTTPException as e:
325  print("crab error ---------------")
326  print(e)
327  print("end error ---------------")
328  print("will try again!")
329  import time
330  time.sleep(5)
331  except CachefileNotFoundException as e:
332  print("crab error ---------------")
333  print(e)
334  print("end error ---------------")
335  print(crabCommandArgs)
336  res={ 'status':"CachefileNotFound",'jobs':{}}
337  break
338  if i>5:
339  res={ 'status':"UnexpectedError",'jobs':{}}
340  break
341  q.put( res )
342 
343 class CertInfo:
344  def __init__( self ):
345  p = subprocess.Popen("voms-proxy-info --fqan",
346  stdout = subprocess.PIPE,
347  stderr = subprocess.PIPE,
348  shell=True)
349  stdout, stderr = p.communicate()
350  print(stdout)
351  if p.returncode != 0:
352  self.vo = ""
353  self.voGroup = ""
354  self.voRole = ""
355  else:
356  lines = stdout.split("\n")
357  splitline = lines[0].split("/")
358  if len(splitline) < 4:
359  splitline = lines[1].split("/")
360  self.vo = splitline[1]
361  self.voGroup = splitline[2]
362  try:
363  self.voRole = splitline[2].split("=")[1]
364  if "NULL" in self.voRole:
365  self.voGroup = ""
366  except:
367  self.voRole = ""
368 
369 
372 class CrabTask:
373 
374 
379  def __init__(self,
380  taskname="",
381  crab_config="",
382  crabController = None ,
383  initUpdate = True,
384  debuglevel = "ERROR",
385  datasetpath = "",
386  localDir = "",
387  outlfn = "" ,):
388 
389  # crab config as a python object should only be used via .config
390  self._crabConfig = None
391 
392  self._crabFolder = None
393 
394  if taskname:
395  self.name = taskname
396  else:
397  if not crab_config:
398  raise ValueError("Either taskname or crab_config needs to be set")
399  if not os.path.exists( crab_config):
400  raise IOError("File %s not found" % crab_config )
401  self.name = crab_config
402  self.name = self.crabConfig.General.requestName
403  self.uuid = uuid.uuid4()
404  #~ self.lock = multiprocessing.Lock()
405  #setup logging
406  self.log = logging.getLogger( 'crabTask' )
407  self.log.setLevel(logging._levelNames[ debuglevel ])
408  self.jobs = {}
409  self.localDir = localDir
410  self.outlfn = outlfn
411  self.isUpdating = False
412  self.taskId = -1
413  #variables for statistics
414  self.nJobs = 0
415  self.state = "NOSTATE"
416  self.maxjobnumber = 0
417  self.nUnsubmitted = 0
418  self.nIdle = 0
419  self.nRunning = 0
420  self.nTransferring = 0
421  self.nCooloff = 0
422  self.nFailed = 0
423  self.nFinished = 0
424  self.nComplete = 0
425  self.failureReason = None
426  self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
427 
428  self._isData = None
429  self.resubmitCount = 0
430 
431  self.debug = False
432 
433  self.finalFiles = []
434  self.totalEvents = 0
435 
436 
437  self._datasetpath_default = datasetpath
438 
439  #start with first updates
440  if initUpdate:
441  self.update()
442  self.updateJobStats()
443 
444 
447  @property
448  def isData( self ):
449  if self._isData is None:
450  try:
451  test = self.crabConfig.Data.lumiMask
452  self._isData = True
453  except:
454  if self.name.startswith( "Data_" ):
455  self._isData = True
456  else:
457  self._isData = False
458  return self._isData
459 
460 
461 
464  @property
465  def crabConfig( self ):
466  if self._crabConfig is None:
467  crab = CrabController()
468  self._crabConfig = crab.readCrabConfig( self.name )
469  return self._crabConfig
470 
471  @property
472  def datasetpath( self ):
473  try:
474  return self.crabConfig.Data.inputDataset
475  except:
476  pass
477  return self._datasetpath_default
478 
479  @property
480  def crabFolder( self ):
481  if not self._crabFolder is None: return self._crabFolder
482  crab = CrabController()
483  if os.path.exists( os.path.join( self.crabConfig.General.workArea, crab._prepareFoldername( self.name ) ) ):
484  self._crabFolder = os.path.join( self.crabConfig.General.workArea, crab._prepareFoldername( self.name ) )
485  return self._crabFolder
486  alternative_path = os.path.join(os.path.cwd(), crab._prepareFoldername( self.name ) )
487  if os.path.exists( alternative_path ):
488  self._crabFolder = alternative_path
489  return self._crabFolder
490  self.log.error( "Unable to find folder for Task")
491  return ""
492 
493 
496  def resubmit_failed( self ):
497  failedJobIds = []
498  controller = CrabController()
499  for jobkey in self.jobs.keys():
500  job = self.jobs[jobkey]
501  if job['State'] == 'failed':
502  failedJobIds.append( job['JobIds'][-1] )
503  controller.resubmit( self.name, joblist = failedJobIds )
504  self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
505 
506  @property
507  def crab_folder(self):
508  return os.path.join( self.crabConfig.General.workArea,
509  "crab_" + self.crabConfig.General.requestName)
510 
513  def update(self):
514  #~ self.lock.acquire()
515  self.log.debug( "Start update for task %s" % self.name )
516  self.isUpdating = True
517  controller = CrabController()
518  self.state = "UPDATING"
519  # check if we should drop this sample due to missing info
520 
521  self.log.debug( "Try to get status for task" )
522  self.state , self.jobs,self.failureReason = controller.status(self.crab_folder)
523  self.log.debug( "Found state: %s" % self.state )
524  if self.state=="FAILED":
525  #try it once more
526  time.sleep(2)
527  self.state , self.jobs,self.failureReason = controller.status(self.crab_folder)
528  self.nJobs = len(self.jobs)
529  self.updateJobStats()
530  if self.state == "NOSTATE":
531  self.log.debug( "Trying to resubmit because of NOSTATE" )
532  if self.resubmitCount < 3: self.self.handleNoState()
533  # add to db if not
534  # Final solution inf state not yet found
535  self.isUpdating = False
536  self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
537  #~ self.lock.release()
538 
539 
542  def handleNoState( self ):
543  crab = CrabController()
544  if "The CRAB3 server backend could not resubmit your task because the Grid scheduler answered with an error." in task.failureReason:
545  # move folder and try it again
546  cmd = 'mv %s bak_%s' %(crab._prepareFoldername( self.name ),crab._prepareFoldername( self.name ))
547  p = subprocess.Popen(cmd,stdout=subprocess.PIPE, shell=True)#,shell=True,universal_newlines=True)
548  (out,err) = p.communicate()
549  self.state = "SHEDERR"
550  configName = '%s_cfg.py' %(crab._prepareFoldername( self.name ))
551  crab.submit( configName )
552 
553  elif task.failureReason is not None:
554  self.state = "ERRHANDLE"
555  crab.resubmit( self.name )
556  self.resubmitCount += 1
557 
558  def test_print(self):
559  return self.uuid
560 
564  def updateJobStats(self,dCacheFileList = None):
565  jobKeys = sorted(self.jobs.keys())
566  try:
567  intJobkeys = [int(x) for x in jobKeys]
568  except:
569  print("error parsing job numers to int")
570 
571  #maxjobnumber = max(intJobkeys)
572 
573  stateDict = {'unsubmitted':0,'idle':0,'running':0,'transferring':0,'cooloff':0,'failed':0,'finished':0}
574  nComplete = 0
575 
576  # loop through jobs
577  for key in jobKeys:
578  job = self.jobs[key]
579  #check if all completed files are on decache
580  for statekey in stateDict.keys():
581  if statekey in job['State']:
582  stateDict[statekey]+=1
583  # check if finished fails are found on dCache if dCacheFilelist is given
584  if dCacheFileList is not None:
585  outputFilename = "%s_%s"%( self.name, key)
586  if 'finished' in statekey and any(outputFilename in s for s in dCacheFileList):
587  nComplete +=1
588 
589  for state in stateDict:
590  attrname = "n" + state.capitalize()
591  setattr(self, attrname, stateDict[state])
592  self.nComplete = nComplete
593 
594 
599  def readLogArch(self, logArchName):
600  JobNumber = logArchName.split("/")[-1].split("_")[1].split(".")[0]
601  log = {'readEvents' : 0}
602  with tarfile.open( logArchName, "r") as tar:
603  try:
604  JobXmlFile = tar.extractfile('FrameworkJobReport-%s.xml' % JobNumber)
605  root = ET.fromstring( JobXmlFile.read() )
606  for child in root:
607  if child.tag == 'InputFile':
608  for subchild in child:
609  if subchild.tag == 'EventsRead':
610  nEvents = int(subchild.text)
611  log.update({'readEvents' : nEvents})
612  break
613  break
614  except:
615  print("Can not parse / read %s" % logArchName)
616  return log
617 
618 
621 class TaskStats:
622 
623 
629  def __init__(self, tasklist = None):
630  if tasklist is not None:
631  self.updateStats(tasklist)
632  else:
633  self.clearStats()
634 
635 
641  def updateStats(self,tasklist):
642  self.clearStats()
643  self.nTasks = len(tasklist)
644  for task in tasklist:
645  if not task.isUpdating:
646  self.nUnsubmitted += task.nUnsubmitted
647  self.nIdle += task.nIdle
648  self.nRunning += task.nRunning
649  self.nTransferring += task.nTransferring
650  self.nCooloff += task.nCooloff
651  self.nFailed += task.nFailed
652  self.nFinished += task.nFinished
653  self.nComplete += task.nComplete
654 
655 
659  def clearStats(self):
660  self.nTasks = 0
661  self.nUnsubmitted = 0
662  self.nIdle = 0
663  self.nRunning = 0
664  self.nTransferring = 0
665  self.nCooloff = 0
666  self.nFailed = 0
667  self.nFinished = 0
668  self.nComplete = 0
def updateJobStats(self, dCacheFileList=None)
Function to update JobStatistics.
def __init__(self, taskname="", crab_config="", crabController=None, initUpdate=True, debuglevel="ERROR", datasetpath="", localDir="", outlfn="")
The object constructor.
static const TGPicture * info(bool iBackgroundIsBlack)
def status(self, name)
Check crab status.
bool any(const std::vector< T > &v, const T &what)
Definition: ECalSD.cc:36
def _prepareFoldername(self, name)
Add crab_ to Foldername if needed.
def __init__(self, tasklist=None)
The object constructor.
def commandlineOptions(self, parser=optparse.OptionParser( 'usage:%prog'))
Populates an existing optparse parser or returns a new one with options for crab functions.
The CrabController class.
def handleNoState(self)
Function to handle Task which received NOSTATE status.
def checkusername(self)
Returns the hn name for a user with valid proxy.
def checkwrite(self, site='T2_DE_RWTH', path='noPath')
Check if crab can write to specified site.
def submit(self, name)
Check if crab can write to specified site.
def clearStats(self)
This function sets all counts to zero.
Class for a single CrabRequest e This class represents one crab3 task/request.
def resubmit(self, name, joblist=None)
Resubmit all failed tasks in job or specified list of jobs in task.
def crabFolders(self)
Return list of all crab folders in workin area (default cwd)
def update(self)
Function to update Task in associated Jobs.
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def getlog(self, name)
Call crab getlog.
def readLogArch(self, logArchName)
Function to read log info from log.tar.gz.
def resubmit_failed(self)
Function to resubmit failed jobs in tasks.
def updateStats(self, tasklist)
This function updates the statistics for a given tasklist.
def crabCommandProcess(q, crabCommandArgs)
Function to run crab command in a new process.
def __init__(self, debug=0, logger=None, workingArea=None, voGroup=None, username=None)
The constructor.
Class holds job statistics for several Crab tasks.
def callCrabCommand(self, crabArgs)
Call crab command in a new process and return result dict.
def readCrabConfig(self, name)
Read a crab config and return python object.
def report(self, name)
Call crab report command and return path to lumiSummary.
#define str(s)
def crabConfig(self)
Function to access crab config object or read it if unititalized.
def isData(self)
Property function to find out if task runs on data.