CMS 3D CMS Logo

WorkFlowRunner.py
Go to the documentation of this file.
1 from threading import Thread
2 from Configuration.PyReleaseValidation import WorkFlow
3 import os,time
4 import shutil
5 from subprocess import Popen
6 from os.path import exists, basename, join
7 from datetime import datetime
8 
9 class WorkFlowRunner(Thread):
10  def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, maxSteps=9999):
11  Thread.__init__(self)
12  self.wf = wf
13 
14  self.status=-1
15  self.report=''
16  self.nfail=0
17  self.npass=0
18  self.noRun=noRun
19  self.dryRun=dryRun
20  self.cafVeto=cafVeto
21  self.dasOptions=dasOptions
22  self.jobReport=jobReport
23  self.nThreads=nThreads
24  self.maxSteps = maxSteps
25 
26  self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
27  return
28 
29  def doCmd(self, cmd):
30 
31  msg = "\n# in: " +os.getcwd()
32  if self.dryRun: msg += " dryRun for '"
33  else: msg += " going to execute "
34  msg += cmd.replace(';','\n')
35  print msg
36 
37  cmdLog = open(self.wfDir+'/cmdLog','a')
38  cmdLog.write(msg+'\n')
39  cmdLog.close()
40 
41  ret = 0
42  if not self.dryRun:
43  p = Popen(cmd, shell=True)
44  ret = os.waitpid(p.pid, 0)[1]
45  if ret != 0:
46  print "ERROR executing ",cmd,'ret=', ret
47 
48  return ret
49 
50  def run(self):
51 
52  startDir = os.getcwd()
53 
54  if not os.path.exists(self.wfDir):
55  os.makedirs(self.wfDir)
56  elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
57  print "cleaning up ", self.wfDir, ' in ', os.getcwd()
58  shutil.rmtree(self.wfDir)
59  os.makedirs(self.wfDir)
60 
61  preamble = 'cd '+self.wfDir+'; '
62 
63  realstarttime = datetime.now()
64  startime='date %s' %time.asctime()
65 
66  # check where we are running:
67  onCAF = False
68  if 'cms/caf/cms' in os.environ['CMS_PATH']:
69  onCAF = True
70 
71  ##needs to set
72  #self.report
73  self.npass = []
74  self.nfail = []
75  self.stat = []
76  self.retStep = []
77 
78  def closeCmd(i,ID):
79  return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
80 
81  inFile=None
82  lumiRangeFile=None
83  aborted=False
84  for (istepmone,com) in enumerate(self.wf.cmds):
85  # isInputOk is used to keep track of the das result. In case this
86  # is False we use a different error message to indicate the failed
87  # das query.
88  isInputOk=True
89  istep=istepmone+1
90  cmd = preamble
91  if aborted:
92  self.npass.append(0)
93  self.nfail.append(0)
94  self.retStep.append(0)
95  self.stat.append('NOTRUN')
96  continue
97  if not isinstance(com,str):
98  if self.cafVeto and (com.location == 'CAF' and not onCAF):
99  print "You need to be no CAF to run",self.wf.numId
100  self.npass.append(0)
101  self.nfail.append(0)
102  self.retStep.append(0)
103  self.stat.append('NOTRUN')
104  aborted=True
105  continue
106  #create lumiRange file first so if das fails we get its error code
107  cmd2 = com.lumiRanges()
108  if cmd2:
109  cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
110  lumiRangeFile='step%d_lumiRanges.log'%(istep,)
111  retStep = self.doCmd(cmd2)
112  if (com.dataSetParent):
113  cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
114  retStep = self.doCmd(cmd3)
115  cmd+=com.das(self.dasOptions,com.dataSet)
116  cmd+=closeCmd(istep,'dasquery')
117  retStep = self.doCmd(cmd)
118  #don't use the file list executed, but use the das command of cmsDriver for next step
119  # If the das output is not there or it's empty, consider it an
120  # issue of this step, not of the next one.
121  dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
122  if not exists(dasOutputPath):
123  retStep = 1
124  dasOutput = None
125  else:
126  # We consider only the files which have at least one logical filename
127  # in it. This is because sometimes das fails and still prints out junk.
128  dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
129  if not dasOutput:
130  retStep = 1
131  isInputOk = False
132 
133  inFile = 'filelist:' + basename(dasOutputPath)
134  print "---"
135  else:
136  #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
137  cmd += com
138  if self.noRun:
139  cmd +=' --no_exec'
140  # in case previous step used DAS query (either filelist of das:)
141  # not to be applied for premixing stage1 to allow combiend stage1+stage2 workflow
142  if inFile and not 'premix_stage1' in cmd:
143  cmd += ' --filein '+inFile
144  inFile=None
145  if lumiRangeFile: #DAS query can also restrict lumi range
146  cmd += ' --lumiToProcess '+lumiRangeFile
147  lumiRangeFile=None
148  # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..
149  if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
150  cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
151  else:
152  # Disable input for premix stage1 to allow combined stage1+stage2 workflow
153  # Bit of a hack but works
154  if istep!=1 and not '--filein' in cmd and not 'premix_stage1' in cmd:
155  cmd+=' --filein file:step%s.root '%(istep-1,)
156  if not '--fileout' in com:
157  cmd+=' --fileout file:step%s.root '%(istep,)
158  if self.jobReport:
159  cmd += ' --suffix "-j JobReport%s.xml " ' % istep
160  if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
161  cmd += ' --nThreads %s' % self.nThreads
162  cmd+=closeCmd(istep,self.wf.nameId)
163  retStep = 0
164  if istep>self.maxSteps:
165  wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
166  wf_stats.write('step%s:%s\n' % (istep, cmd))
167  wf_stats.close()
168  else: retStep = self.doCmd(cmd)
169 
170  self.retStep.append(retStep)
171  if retStep == 32000:
172  # A timeout occurred
173  self.npass.append(0)
174  self.nfail.append(1)
175  self.stat.append('TIMEOUT')
176  aborted = True
177  elif (retStep!=0):
178  #error occured
179  self.npass.append(0)
180  self.nfail.append(1)
181  if not isInputOk:
182  self.stat.append("DAS_ERROR")
183  else:
184  self.stat.append('FAILED')
185  #to skip processing
186  aborted=True
187  else:
188  #things went fine
189  self.npass.append(1)
190  self.nfail.append(0)
191  self.stat.append('PASSED')
192 
193  os.chdir(startDir)
194  endtime='date %s' %time.asctime()
195  tottime='%s-%s'%(endtime,startime)
196 
197 
198  #### wrap up ####
199 
200  logStat=''
201  for i,s in enumerate(self.stat):
202  logStat+='Step%d-%s '%(i,s)
203  self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
204 
205  return
206 
def __init__(self, wf, noRun=False, dryRun=False, cafVeto=True, dasOptions="", jobReport=False, nThreads=1, maxSteps=9999)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
npass
needs to set self.report
#define str(s)
double split
Definition: MVATrainer.cc:139