CMS 3D CMS Logo

mps_update.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from __future__ import print_function
3 from builtins import range
4 import os
5 import re
6 import subprocess
7 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
8 
9 import six
10 
11 def fill_time_info(mps_index, status, cpu_time):
12  """Fill timing info in the database for `mps_index`.
13 
14  Arguments:
15  - `mps_index`: index in the MPS database
16  - `status`: job status
17  - `cpu_time`: extracted CPU timing information
18  """
19 
20  cpu_time = int(round(cpu_time)) # care only about seconds for now
21  if status in ("RUN", "DONE"):
22  if cpu_time > 0:
23  diff = cpu_time - lib.JOBRUNTIME[mps_index]
24  lib.JOBRUNTIME[mps_index] = cpu_time
25  lib.JOBHOST[mps_index] = "+"+str(diff)
26  lib.JOBINCR[mps_index] = diff
27  else:
28  lib.JOBRUNTIME[mps_index] = 0
29  lib.JOBINCR[mps_index] = 0
30 
31 
32 
33 ################################################################################
34 # mapping of HTCondor status codes to MPS status
35 htcondor_jobstatus = {"1": "PEND", # Idle
36  "2": "RUN", # Running
37  "3": "EXIT", # Removed
38  "4": "DONE", # Completed
39  "5": "PEND", # Held
40  "6": "RUN", # Transferring output
41  "7": "PEND"} # Suspended
42 
43 
44 ################################################################################
45 # collect submitted jobs (use 'in' to handle composites, e.g. DISABLEDFETCH)
46 lib = mpslib.jobdatabase()
47 lib.read_db()
48 
49 submitted_jobs = {}
50 for i in range(len(lib.JOBID)):
51  submitted = True
52  for status in ("SETUP", "OK", "DONE", "FETCH", "ABEND", "WARN", "FAIL"):
53  if status in lib.JOBSTATUS[i]:
54  submitted = False
55  break
56  if submitted:
57  submitted_jobs[lib.JOBID[i]] = i
58 print("submitted jobs:", len(submitted_jobs))
59 
60 
61 ################################################################################
62 # deal with submitted jobs by looking into output of shell (bjobs/condor_q)
63 if len(submitted_jobs) > 0:
64  job_status = {}
65  if "htcondor" in lib.get_class("pede") or "htcondor" in lib.get_class("mille"):
66  condor_q = subprocess.check_output(["condor_q", "-af:j",
67  "JobStatus", "RemoteSysCpu"],
68  stderr = subprocess.STDOUT)
69  for line in condor_q.splitlines():
70  job_id, status, cpu_time = line.split()
71  job_status[job_id] = {"status": htcondor_jobstatus[status],
72  "cpu": float(cpu_time)}
73 
74  bjobs = subprocess.check_output(["bjobs", "-l", "-a"],
75  stderr = subprocess.STDOUT)
76  bjobs = bjobs.replace("\n","")
77 
78  job_regex = re.compile(r"Job<(\d+?)>,")
79  status_regex = re.compile(r"Status<([A-Z]+?)>")
80  cputime_regex = re.compile(r"TheCPUtimeusedis(\d+(\.\d+)?)seconds")
81  if bjobs != "No job found":
82  results = bjobs.replace(" ","").split("-----------------------")
83  for line in results:
84  if len(line.strip()) == 0: continue
85  # extract jobID
86  job_id = job_regex.search(line).group(1)
87  # extract job status
88  status = status_regex.search(line).group(1)
89  # extract CPU time (only present for finished job)
90  match = cputime_regex.search(line)
91  cpu_time = float(match.group(1)) if match else 0
92  print("out ", job_id, " ", status, " ", cpu_time)
93  job_status[job_id] = {"status": status,
94  "cpu": cpu_time}
95 
96  for job_id, job_info in six.iteritems(job_status):
97  mps_index = submitted_jobs.get(job_id, -1)
98  # check for disabled Jobs
99  disabled = "DISABLED" if "DISABLED" in lib.JOBSTATUS[mps_index] else ""
100 
101  # continue with next batch job if not found or not interesting
102  if mps_index == -1:
103  print("mps_update.py - the job", job_id, end=' ')
104  print("was not found in the JOBID array")
105  continue
106  else: # pop entry from submitted jobs
107  submitted_jobs.pop(job_id)
108 
109 
110  # if found update Joblists for mps.db
111  lib.JOBSTATUS[mps_index] = disabled+job_info["status"]
112  fill_time_info(mps_index, job_info["status"], job_info["cpu"])
113 
114 
115 ################################################################################
116 # loop over remaining jobs to see whether they are done
117 for job_id, mps_index in submitted_jobs.items(): # IMPORTANT to copy here (no iterator!)
118  # check if current job is disabled. Print stuff.
119  disabled = "DISABLED" if "DISABLED" in lib.JOBSTATUS[mps_index] else ""
120  print(" DB job ", job_id, mps_index)
121 
122  # check if job may be done by looking if a folder exists in the project directory.
123  # if True -> jobstatus is set to DONE
124  theBatchDirectory = "LSFJOB_"+job_id
125  if os.path.isdir(theBatchDirectory):
126  print("Directory ", theBatchDirectory, "exists")
127  lib.JOBSTATUS[mps_index] = disabled + "DONE"
128  submitted_jobs.pop(job_id)
129  continue
130 
131  # check if it is a HTCondor job already moved to "history"
132  elif "htcondor" in lib.get_class("pede") or "htcondor" in lib.get_class("mille"):
133  userlog = os.path.join("jobData", lib.JOBDIR[mps_index], "HTCJOB")
134  condor_h = subprocess.check_output(["condor_history", job_id, "-limit", "1",
135  "-userlog", userlog,
136  "-af:j", "JobStatus", "RemoteSysCpu"],
137  stderr = subprocess.STDOUT)
138  if len(condor_h.strip()) > 0:
139  job_id, status, cpu_time = condor_h.split()
140  status = htcondor_jobstatus[status]
141  lib.JOBSTATUS[mps_index] = disabled + status
142  fill_time_info(mps_index, status, float(cpu_time))
143  submitted_jobs.pop(job_id)
144  continue
145 
146  if "RUN" in lib.JOBSTATUS[mps_index]:
147  print("WARNING: Job ", mps_index, end=' ')
148  print("in state RUN, neither found by htcondor, nor bjobs, nor find", end=' ')
149  print("LSFJOB directory!")
150 
151 
152 ################################################################################
153 # check for orphaned jobs
154 for job_id, mps_index in six.iteritems(submitted_jobs):
155  for status in ("SETUP", "DONE", "FETCH", "TIMEL", "SUBTD"):
156  if status in lib.JOBSTATUS[mps_index]:
157  print("Funny entry index", mps_index, " job", lib.JOBID[mps_index], end=' ')
158  print(" status", lib.JOBSTATUS[mps_index])
159 
160 
161 lib.write_db()
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def fill_time_info(mps_index, status, cpu_time)
Definition: mps_update.py:11
#define str(s)
double split
Definition: MVATrainer.cc:139