CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
WorkFlowRunner.py
Go to the documentation of this file.
1 
2 from threading import Thread
3 
4 from Configuration.PyReleaseValidation import WorkFlow
5 import os,time
6 import shutil
7 from subprocess import Popen
8 
9 class WorkFlowRunner(Thread):
10  def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False):
11  Thread.__init__(self)
12  self.wf = wf
13 
14  self.status=-1
15  self.report=''
16  self.nfail=0
17  self.npass=0
18  self.noRun=noRun
19  self.dryRun=dryRun
20  self.cafVeto=cafVeto
21  self.dasOptions=dasOptions
22  self.jobReport=jobReport
23 
24  self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
25  return
26 
27  def doCmd(self, cmd):
28 
29  msg = "\n# in: " +os.getcwd()
30  if self.dryRun: msg += " dryRun for '"
31  else: msg += " going to execute "
32  msg += cmd.replace(';','\n')
33  print msg
34 
35  cmdLog = open(self.wfDir+'/cmdLog','a')
36  cmdLog.write(msg+'\n')
37  cmdLog.close()
38 
39  ret = 0
40  if not self.dryRun:
41  p = Popen(cmd, shell=True)
42  ret = os.waitpid(p.pid, 0)[1]
43  if ret != 0:
44  print "ERROR executing ",cmd,'ret=', ret
45 
46  return ret
47 
48  def run(self):
49 
50  startDir = os.getcwd()
51 
52  if not os.path.exists(self.wfDir):
53  os.makedirs(self.wfDir)
54  elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
55  print "cleaning up ", self.wfDir, ' in ', os.getcwd()
56  shutil.rmtree(self.wfDir)
57  os.makedirs(self.wfDir)
58 
59  preamble = 'cd '+self.wfDir+'; '
60 
61  startime='date %s' %time.asctime()
62 
63  # check where we are running:
64  onCAF = False
65  if 'cms/caf/cms' in os.environ['CMS_PATH']:
66  onCAF = True
67 
68  ##needs to set
69  #self.report
70  self.npass = []
71  self.nfail = []
72  self.stat = []
73  self.retStep = []
74 
75  def closeCmd(i,ID):
76  return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
77 
78  inFile=None
79  lumiRangeFile=None
80  aborted=False
81  for (istepmone,com) in enumerate(self.wf.cmds):
82  istep=istepmone+1
83  cmd = preamble
84  if aborted:
85  self.npass.append(0)
86  self.nfail.append(0)
87  self.retStep.append(0)
88  self.stat.append('NOTRUN')
89  continue
90  if not isinstance(com,str):
91  if self.cafVeto and (com.location == 'CAF' and not onCAF):
92  print "You need to be no CAF to run",self.wf.numId
93  self.npass.append(0)
94  self.nfail.append(0)
95  self.retStep.append(0)
96  self.stat.append('NOTRUN')
97  aborted=True
98  continue
99  #create lumiRange file first so if das fails we get its error code
100  cmd2 = com.lumiRanges()
101  if cmd2:
102  cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
103  lumiRangeFile='step%d_lumiRanges.log'%(istep,)
104  retStep = self.doCmd(cmd2)
105  cmd+=com.das(self.dasOptions)
106  cmd+=closeCmd(istep,'dasquery')
107  retStep = self.doCmd(cmd)
108  #don't use the file list executed, but use the das command of cmsDriver for next step
109  inFile='filelist:step%d_dasquery.log'%(istep,)
110  print "---"
111  else:
112  #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
113  cmd += com
114  if self.noRun:
115  cmd +=' --no_exec'
116  if inFile: #in case previous step used DAS query (either filelist of das:)
117  cmd += ' --filein '+inFile
118  inFile=None
119  if lumiRangeFile: #DAS query can also restrict lumi range
120  cmd += ' --lumiToProcess '+lumiRangeFile
121  lumiRangeFile=None
122  # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..
123  if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
124  cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
125  else:
126  if istep!=1 and not '--filein' in cmd:
127  cmd+=' --filein file:step%s.root '%(istep-1,)
128  if not '--fileout' in com:
129  cmd+=' --fileout file:step%s.root '%(istep,)
130  if self.jobReport:
131  cmd += ' --suffix "-j JobReport%s.xml " ' % istep
132  cmd+=closeCmd(istep,self.wf.nameId)
133  retStep = self.doCmd(cmd)
134 
135 
136 
137  self.retStep.append(retStep)
138  if (retStep!=0):
139  #error occured
140  self.npass.append(0)
141  self.nfail.append(1)
142  self.stat.append('FAILED')
143  #to skip processing
144  aborted=True
145  else:
146  #things went fine
147  self.npass.append(1)
148  self.nfail.append(0)
149  self.stat.append('PASSED')
150 
151 
152  os.chdir(startDir)
153 
154  endtime='date %s' %time.asctime()
155  tottime='%s-%s'%(endtime,startime)
156 
157 
158  #### wrap up ####
159 
160  logStat=''
161  for i,s in enumerate(self.stat):
162  logStat+='Step%d-%s '%(i,s)
163  self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
164 
165  return
166 
167 
168 
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
npass
needs to set self.report