CMS 3D CMS Logo

mps_update.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import os
3 import re
4 import subprocess
5 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
6 
7 import six
8 
9 def fill_time_info(mps_index, status, cpu_time):
10  """Fill timing info in the database for `mps_index`.
11 
12  Arguments:
13  - `mps_index`: index in the MPS database
14  - `status`: job status
15  - `cpu_time`: extracted CPU timing information
16  """
17 
18  cpu_time = int(round(cpu_time)) # care only about seconds for now
19  if status in ("RUN", "DONE"):
20  if cpu_time > 0:
21  diff = cpu_time - lib.JOBRUNTIME[mps_index]
22  lib.JOBRUNTIME[mps_index] = cpu_time
23  lib.JOBHOST[mps_index] = "+"+str(diff)
24  lib.JOBINCR[mps_index] = diff
25  else:
26  lib.JOBRUNTIME[mps_index] = 0
27  lib.JOBINCR[mps_index] = 0
28 
29 
30 
31 ################################################################################
32 # mapping of HTCondor status codes to MPS status
33 htcondor_jobstatus = {"1": "PEND", # Idle
34  "2": "RUN", # Running
35  "3": "EXIT", # Removed
36  "4": "DONE", # Completed
37  "5": "PEND", # Held
38  "6": "RUN", # Transferring output
39  "7": "PEND"} # Suspended
40 
41 
42 ################################################################################
43 # collect submitted jobs (use 'in' to handle composites, e.g. DISABLEDFETCH)
44 lib = mpslib.jobdatabase()
45 lib.read_db()
46 
47 submitted_jobs = {}
48 for i in xrange(len(lib.JOBID)):
49  submitted = True
50  for status in ("SETUP", "OK", "DONE", "FETCH", "ABEND", "WARN", "FAIL"):
51  if status in lib.JOBSTATUS[i]:
52  submitted = False
53  break
54  if submitted:
55  submitted_jobs[lib.JOBID[i]] = i
56 print "submitted jobs:", len(submitted_jobs)
57 
58 
59 ################################################################################
60 # deal with submitted jobs by looking into output of shell (bjobs/condor_q)
61 if len(submitted_jobs) > 0:
62  job_status = {}
63  if "htcondor" in lib.get_class("pede"):
64  condor_q = subprocess.check_output(["condor_q", "-af:j",
65  "JobStatus", "RemoteSysCpu"],
66  stderr = subprocess.STDOUT)
67  for line in condor_q.splitlines():
68  job_id, status, cpu_time = line.split()
69  job_status[job_id] = {"status": htcondor_jobstatus[status],
70  "cpu": float(cpu_time)}
71 
72  bjobs = subprocess.check_output(["bjobs", "-l", "-a"],
73  stderr = subprocess.STDOUT)
74  bjobs = bjobs.replace("\n","")
75 
76  job_regex = re.compile(r"Job<(\d+?)>,")
77  status_regex = re.compile(r"Status<([A-Z]+?)>")
78  cputime_regex = re.compile(r"TheCPUtimeusedis(\d+(\.\d+)?)seconds")
79  if bjobs != "No job found":
80  results = bjobs.replace(" ","").split("-----------------------")
81  for line in results:
82  if len(line.strip()) == 0: continue
83  # extract jobID
84  job_id = job_regex.search(line).group(1)
85  # extract job status
86  status = status_regex.search(line).group(1)
87  # extract CPU time (only present for finished job)
88  match = cputime_regex.search(line)
89  cpu_time = float(match.group(1)) if match else 0
90  print "out ", job_id, " ", status, " ", cpu_time
91  job_status[job_id] = {"status": status,
92  "cpu": cpu_time}
93 
94  for job_id, job_info in six.iteritems(job_status):
95  mps_index = submitted_jobs.get(job_id, -1)
96  # check for disabled Jobs
97  disabled = "DISABLED" if "DISABLED" in lib.JOBSTATUS[mps_index] else ""
98 
99  # continue with next batch job if not found or not interesting
100  if mps_index == -1:
101  print "mps_update.py - the job", job_id,
102  print "was not found in the JOBID array"
103  continue
104  else: # pop entry from submitted jobs
105  submitted_jobs.pop(job_id)
106 
107 
108  # if found update Joblists for mps.db
109  lib.JOBSTATUS[mps_index] = disabled+job_info["status"]
110  fill_time_info(mps_index, job_info["status"], job_info["cpu"])
111 
112 
113 ################################################################################
114 # loop over remaining jobs to see whether they are done
115 for job_id, mps_index in submitted_jobs.items(): # IMPORTANT to copy here (no iterator!)
116  # check if current job is disabled. Print stuff.
117  disabled = "DISABLED" if "DISABLED" in lib.JOBSTATUS[mps_index] else ""
118  print " DB job ", job_id, mps_index
119 
120  # check if job may be done by looking if a folder exists in the project directory.
121  # if True -> jobstatus is set to DONE
122  theBatchDirectory = "LSFJOB_"+job_id
123  if os.path.isdir(theBatchDirectory):
124  print "Directory ", theBatchDirectory, "exists"
125  lib.JOBSTATUS[mps_index] = disabled + "DONE"
126  submitted_jobs.pop(job_id)
127  continue
128 
129  # check if it is a HTCondor job already moved to "history"
130  elif "htcondor" in lib.get_class("pede"):
131  userlog = os.path.join("jobData", lib.JOBDIR[mps_index], "HTCJOB")
132  condor_h = subprocess.check_output(["condor_history", job_id, "-limit", "1",
133  "-userlog", userlog,
134  "-af:j", "JobStatus", "RemoteSysCpu"],
135  stderr = subprocess.STDOUT)
136  if len(condor_h.strip()) > 0:
137  job_id, status, cpu_time = condor_h.split()
138  status = htcondor_jobstatus[status]
139  lib.JOBSTATUS[mps_index] = disabled + status
140  fill_time_info(mps_index, status, float(cpu_time))
141  submitted_jobs.pop(job_id)
142  continue
143 
144  if "RUN" in lib.JOBSTATUS[mps_index]:
145  print "WARNING: Job ", mps_index,
146  print "in state RUN, neither found by htcondor, nor bjobs, nor find",
147  print "LSFJOB directory!"
148 
149 
150 ################################################################################
151 # check for orphaned jobs
152 for job_id, mps_index in six.iteritems(submitted_jobs):
153  for status in ("SETUP", "DONE", "FETCH", "TIMEL", "SUBTD"):
154  if status in lib.JOBSTATUS[mps_index]:
155  print "Funny entry index", mps_index, " job", lib.JOBID[mps_index],
156  print " status", lib.JOBSTATUS[mps_index]
157 
158 
159 lib.write_db()
def fill_time_info(mps_index, status, cpu_time)
Definition: mps_update.py:9
#define str(s)
double split
Definition: MVATrainer.cc:139