CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
WorkFlowRunner.py
Go to the documentation of this file.
1 
2 from threading import Thread
3 
4 from Configuration.PyReleaseValidation import WorkFlow
5 import os,time
6 import shutil
7 from subprocess import Popen
8 from os.path import exists, basename, join
9 from os import getenv
10 from datetime import datetime
11 from hashlib import sha1
12 import urllib2, base64, json, re
13 from socket import gethostname
14 
15 # This is used to report results of the runTheMatrix to the elasticsearch
16 # instance used for IBs. This way we can track progress even if the logs are
17 # not available.
18 def esReportWorkflow(**kwds):
19  # Silently exit if we cannot contact elasticsearch
20  es_hostname = getenv("ES_HOSTNAME")
21  es_auth = getenv("ES_AUTH")
22  if not es_hostname and not es_auth:
23  return
24  payload = kwds
25  sha1_id = sha1(kwds["release"] + kwds["architecture"] + kwds["workflow"] + str(kwds["step"])).hexdigest()
26  d = datetime.now()
27  if "_201" in kwds["release"]:
28  datepart = "201" + kwds["release"].split("_201")[1]
29  d = datetime.strptime(datepart, "%Y-%m-%d-%H00")
30  payload["release_queue"] = kwds["release"].split("_201")[0]
31  payload["release_date"] = d.strftime("%Y-%m-%d-%H00")
32  # Parse log file to look for exceptions, errors and warnings.
33  logFile = payload.pop("log_file", "")
34  exception = ""
35  error = ""
36  errors = []
37  inException = False
38  inError = False
39  if exists(logFile):
40  lines = file(logFile).read()
41  payload["message"] = lines
42  for l in lines.split("\n"):
43  if l.startswith("----- Begin Fatal Exception"):
44  inException = True
45  continue
46  if l.startswith("----- End Fatal Exception"):
47  inException = False
48  continue
49  if l.startswith("%MSG-e"):
50  inError = True
51  error = l
52  error_kind = re.split(" [0-9a-zA-Z-]* [0-9:]{8} CET", error)[0].replace("%MSG-e ", "")
53  continue
54  if inError == True and l.startswith("%MSG"):
55  inError = False
56  errors.append({"error": error, "kind": error_kind})
57  error = ""
58  error_kind = ""
59  continue
60  if inException:
61  exception += l + "\n"
62  if inError:
63  error += l + "\n"
64 
65  if exception:
66  payload["exception"] = exception
67  if errors:
68  payload["errors"] = errors
69 
70  payload["hostname"] = gethostname()
71  url = "https://%s/ib-matrix.%s/runTheMatrix-data/%s" % (es_hostname,
72  d.strftime("%Y-%W-1"),
73  sha1_id)
74  request = urllib2.Request(url)
75  if es_auth:
76  base64string = base64.encodestring(es_auth).replace('\n', '')
77  request.add_header("Authorization", "Basic %s" % base64string)
78  request.get_method = lambda: 'PUT'
79  data = json.dumps(payload)
80  try:
81  result = urllib2.urlopen(request, data=data)
82  except urllib2.HTTPError, e:
83  print e
84  try:
85  print result.read()
86  except:
87  pass
88 
89 class WorkFlowRunner(Thread):
90  def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1):
91  Thread.__init__(self)
92  self.wf = wf
93 
94  self.status=-1
95  self.report=''
96  self.nfail=0
97  self.npass=0
98  self.noRun=noRun
99  self.dryRun=dryRun
100  self.cafVeto=cafVeto
101  self.dasOptions=dasOptions
102  self.jobReport=jobReport
103  self.nThreads=nThreads
104 
105  self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
106  return
107 
108  def doCmd(self, cmd):
109 
110  msg = "\n# in: " +os.getcwd()
111  if self.dryRun: msg += " dryRun for '"
112  else: msg += " going to execute "
113  msg += cmd.replace(';','\n')
114  print msg
115 
116  cmdLog = open(self.wfDir+'/cmdLog','a')
117  cmdLog.write(msg+'\n')
118  cmdLog.close()
119 
120  ret = 0
121  if not self.dryRun:
122  p = Popen(cmd, shell=True)
123  ret = os.waitpid(p.pid, 0)[1]
124  if ret != 0:
125  print "ERROR executing ",cmd,'ret=', ret
126 
127  return ret
128 
129  def run(self):
130 
131  startDir = os.getcwd()
132 
133  if not os.path.exists(self.wfDir):
134  os.makedirs(self.wfDir)
135  elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
136  print "cleaning up ", self.wfDir, ' in ', os.getcwd()
137  shutil.rmtree(self.wfDir)
138  os.makedirs(self.wfDir)
139 
140  preamble = 'cd '+self.wfDir+'; '
141 
142  realstarttime = datetime.now()
143  startime='date %s' %time.asctime()
144 
145  # check where we are running:
146  onCAF = False
147  if 'cms/caf/cms' in os.environ['CMS_PATH']:
148  onCAF = True
149 
150  ##needs to set
151  #self.report
152  self.npass = []
153  self.nfail = []
154  self.stat = []
155  self.retStep = []
156 
157  def closeCmd(i,ID):
158  return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
159 
160  inFile=None
161  lumiRangeFile=None
162  aborted=False
163  for (istepmone,com) in enumerate(self.wf.cmds):
164  # isInputOk is used to keep track of the das result. In case this
165  # is False we use a different error message to indicate the failed
166  # das query.
167  isInputOk=True
168  istep=istepmone+1
169  cmd = preamble
170  if aborted:
171  self.npass.append(0)
172  self.nfail.append(0)
173  self.retStep.append(0)
174  self.stat.append('NOTRUN')
175  continue
176  if not isinstance(com,str):
177  if self.cafVeto and (com.location == 'CAF' and not onCAF):
178  print "You need to be no CAF to run",self.wf.numId
179  self.npass.append(0)
180  self.nfail.append(0)
181  self.retStep.append(0)
182  self.stat.append('NOTRUN')
183  aborted=True
184  continue
185  #create lumiRange file first so if das fails we get its error code
186  cmd2 = com.lumiRanges()
187  if cmd2:
188  cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
189  lumiRangeFile='step%d_lumiRanges.log'%(istep,)
190  retStep = self.doCmd(cmd2)
191  cmd+=com.das(self.dasOptions)
192  cmd+=closeCmd(istep,'dasquery')
193  retStep = self.doCmd(cmd)
194  #don't use the file list executed, but use the das command of cmsDriver for next step
195  # If the das output is not there or it's empty, consider it an
196  # issue of this step, not of the next one.
197  dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
198  if not exists(dasOutputPath):
199  retStep = 1
200  dasOutput = None
201  else:
202  # We consider only the files which have at least one logical filename
203  # in it. This is because sometimes das fails and still prints out junk.
204  dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
205  if not dasOutput:
206  retStep = 1
207  isInputOk = False
208 
209  inFile = 'filelist:' + basename(dasOutputPath)
210  print "---"
211  else:
212  #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
213  cmd += com
214  if self.noRun:
215  cmd +=' --no_exec'
216  if inFile: #in case previous step used DAS query (either filelist of das:)
217  cmd += ' --filein '+inFile
218  inFile=None
219  if lumiRangeFile: #DAS query can also restrict lumi range
220  cmd += ' --lumiToProcess '+lumiRangeFile
221  lumiRangeFile=None
222  # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..
223  if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
224  cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
225  else:
226  if istep!=1 and not '--filein' in cmd:
227  cmd+=' --filein file:step%s.root '%(istep-1,)
228  if not '--fileout' in com:
229  cmd+=' --fileout file:step%s.root '%(istep,)
230  if self.jobReport:
231  cmd += ' --suffix "-j JobReport%s.xml " ' % istep
232  if self.nThreads > 1:
233  cmd += ' --nThreads %s' % self.nThreads
234  cmd+=closeCmd(istep,self.wf.nameId)
235 
236  esReportWorkflow(workflow=self.wf.nameId,
237  release=getenv("CMSSW_VERSION"),
238  architecture=getenv("SCRAM_ARCH"),
239  step=istep,
240  command=cmd,
241  status="STARTED",
242  start_time=realstarttime.isoformat(),
243  workflow_id=self.wf.numId)
244  retStep = self.doCmd(cmd)
245 
246 
247 
248  self.retStep.append(retStep)
249  if retStep == 32000:
250  # A timeout occurred
251  self.npass.append(0)
252  self.nfail.append(1)
253  self.stat.append('TIMEOUT')
254  aborted = True
255  elif (retStep!=0):
256  #error occured
257  self.npass.append(0)
258  self.nfail.append(1)
259  if not isInputOk:
260  self.stat.append("DAS_ERROR")
261  else:
262  self.stat.append('FAILED')
263  #to skip processing
264  aborted=True
265  else:
266  #things went fine
267  self.npass.append(1)
268  self.nfail.append(0)
269  self.stat.append('PASSED')
270 
271  esReportWorkflow(workflow=self.wf.nameId,
272  release=getenv("CMSSW_VERSION"),
273  architecture=getenv("SCRAM_ARCH"),
274  step=istep,
275  command=cmd,
276  status=self.stat[-1],
277  start_time=realstarttime.isoformat(),
278  end_time=datetime.now().isoformat(),
279  delta_time=(datetime.now() - realstarttime).seconds,
280  workflow_id=self.wf.numId,
281  log_file="%s/step%d_%s.log" % (self.wfDir, istep, self.wf.nameId))
282 
283  os.chdir(startDir)
284 
285  endtime='date %s' %time.asctime()
286  tottime='%s-%s'%(endtime,startime)
287 
288 
289  #### wrap up ####
290 
291  logStat=''
292  for i,s in enumerate(self.stat):
293  logStat+='Step%d-%s '%(i,s)
294  self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
295 
296  return
297 
298 
299 
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
npass
needs to set self.report
double split
Definition: MVATrainer.cc:139