CMS 3D CMS Logo

WorkFlowRunner.py
Go to the documentation of this file.
1 from __future__ import print_function
2 from threading import Thread
3 from Configuration.PyReleaseValidation import WorkFlow
4 import os,time
5 import shutil
6 from subprocess import Popen
7 from os.path import exists, basename, join
8 from datetime import datetime
9 
10 class WorkFlowRunner(Thread):
11  def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, nStreams=0, maxSteps=9999, nEvents=0):
12  Thread.__init__(self)
13  self.wf = wf
14 
15  self.status=-1
16  self.report=''
17  self.nfail=0
18  self.npass=0
19  self.noRun=noRun
20  self.dryRun=dryRun
21  self.cafVeto=cafVeto
22  self.dasOptions=dasOptions
23  self.jobReport=jobReport
24  self.nThreads=nThreads
25  self.nStreams=nStreams
26  self.maxSteps=maxSteps
27  self.nEvents=nEvents
28  self.recoOutput=''
29 
30  self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
31  return
32 
33  def doCmd(self, cmd):
34 
35  msg = "\n# in: " +os.getcwd()
36  if self.dryRun: msg += " dryRun for '"
37  else: msg += " going to execute "
38  msg += cmd.replace(';','\n')
39  print(msg)
40 
41  cmdLog = open(self.wfDir+'/cmdLog','a')
42  cmdLog.write(msg+'\n')
43  cmdLog.close()
44 
45  ret = 0
46  if not self.dryRun:
47  p = Popen(cmd, shell=True)
48  ret = os.waitpid(p.pid, 0)[1]
49  if ret != 0:
50  print("ERROR executing ",cmd,'ret=', ret)
51 
52  return ret
53 
54  def run(self):
55 
56  startDir = os.getcwd()
57 
58  if not os.path.exists(self.wfDir):
59  os.makedirs(self.wfDir)
60  elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
61  print("cleaning up ", self.wfDir, ' in ', os.getcwd())
62  shutil.rmtree(self.wfDir)
63  os.makedirs(self.wfDir)
64 
65  preamble = 'cd '+self.wfDir+'; '
66 
67  realstarttime = datetime.now()
68  startime='date %s' %time.asctime()
69 
70  # check where we are running:
71  onCAF = False
72  if 'cms/caf/cms' in os.environ['CMS_PATH']:
73  onCAF = True
74 
75 
77  self.npass = []
78  self.nfail = []
79  self.stat = []
80  self.retStep = []
81 
82  def closeCmd(i,ID):
83  return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
84 
85  inFile=None
86  lumiRangeFile=None
87  aborted=False
88  for (istepmone,com) in enumerate(self.wf.cmds):
89  # isInputOk is used to keep track of the das result. In case this
90  # is False we use a different error message to indicate the failed
91  # das query.
92  isInputOk=True
93  istep=istepmone+1
94  cmd = preamble
95  if aborted:
96  self.npass.append(0)
97  self.nfail.append(0)
98  self.retStep.append(0)
99  self.stat.append('NOTRUN')
100  continue
101  if not isinstance(com,str):
102  if self.cafVeto and (com.location == 'CAF' and not onCAF):
103  print("You need to be no CAF to run",self.wf.numId)
104  self.npass.append(0)
105  self.nfail.append(0)
106  self.retStep.append(0)
107  self.stat.append('NOTRUN')
108  aborted=True
109  continue
110  #create lumiRange file first so if das fails we get its error code
111  cmd2 = com.lumiRanges()
112  if cmd2:
113  cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
114  lumiRangeFile='step%d_lumiRanges.log'%(istep,)
115  retStep = self.doCmd(cmd2)
116  if (com.dataSetParent):
117  cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
118  retStep = self.doCmd(cmd3)
119  cmd+=com.das(self.dasOptions,com.dataSet)
120  cmd+=closeCmd(istep,'dasquery')
121  retStep = self.doCmd(cmd)
122  #don't use the file list executed, but use the das command of cmsDriver for next step
123  # If the das output is not there or it's empty, consider it an
124  # issue of this step, not of the next one.
125  dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
126  # Check created das output in no-dryRun mode only
127  if not self.dryRun:
128  if not exists(dasOutputPath):
129  retStep = 1
130  dasOutput = None
131  else:
132  # We consider only the files which have at least one logical filename
133  # in it. This is because sometimes das fails and still prints out junk.
134  dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
135  if not dasOutput:
136  retStep = 1
137  isInputOk = False
138 
139  inFile = 'filelist:' + basename(dasOutputPath)
140  print("---")
141  else:
142  #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
143  cmd += com
144  if self.noRun:
145  cmd +=' --no_exec'
146  # in case previous step used DAS query (either filelist of das:)
147  # not to be applied for premixing stage1 to allow combiend stage1+stage2 workflow
148  if inFile and not 'premix_stage1' in cmd:
149  cmd += ' --filein '+inFile
150  inFile=None
151  if lumiRangeFile: #DAS query can also restrict lumi range
152  cmd += ' --lumiToProcess '+lumiRangeFile
153  lumiRangeFile=None
154  # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..
155  if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
156  cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
157  else:
158  # Disable input for premix stage1 to allow combined stage1+stage2 workflow
159  # Disable input for premix stage2 in FastSim to allow combined stage1+stage2 workflow (in FS, stage2 does also GEN)
160  # Ugly hack but works
161  if istep!=1 and not '--filein' in cmd and not 'premix_stage1' in cmd and not ("--fast" in cmd and "premix_stage2" in cmd):
162  steps = cmd.split("-s ")[1].split(" ")[0]
163  if "ALCA" not in steps:
164  cmd+=' --filein file:step%s.root '%(istep-1,)
165  elif "ALCA" in steps and "RECO" in steps:
166  cmd+=' --filein file:step%s.root '%(istep-1,)
167  elif self.recoOutput:
168  cmd+=' --filein %s'%(self.recoOutput)
169  else:
170  cmd+=' --filein file:step%s.root '%(istep-1,)
171  if not '--fileout' in com:
172  cmd+=' --fileout file:step%s.root '%(istep,)
173  if "RECO" in cmd:
174  self.recoOutput = "file:step%d.root"%(istep)
175  if self.jobReport:
176  cmd += ' --suffix "-j JobReport%s.xml " ' % istep
177  if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
178  cmd += ' --nThreads %s' % self.nThreads
179  if (self.nStreams > 0) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
180  cmd += ' --nStreams %s' % self.nStreams
181  if (self.nEvents > 0):
182  event_token = " -n "
183  split = cmd.split(event_token)
184  pos_cmd = " ".join(split[1].split(" ")[1:])
185  cmd = split[0] + event_token + '%s ' % self.nEvents + pos_cmd
186  cmd+=closeCmd(istep,self.wf.nameId)
187  retStep = 0
188  if istep>self.maxSteps:
189  wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
190  wf_stats.write('step%s:%s\n' % (istep, cmd))
191  wf_stats.close()
192  else: retStep = self.doCmd(cmd)
193 
194  self.retStep.append(retStep)
195  if retStep == 32000:
196  # A timeout occurred
197  self.npass.append(0)
198  self.nfail.append(1)
199  self.stat.append('TIMEOUT')
200  aborted = True
201  elif (retStep!=0):
202  #error occured
203  self.npass.append(0)
204  self.nfail.append(1)
205  if not isInputOk:
206  self.stat.append("DAS_ERROR")
207  else:
208  self.stat.append('FAILED')
209  #to skip processing
210  aborted=True
211  else:
212  #things went fine
213  self.npass.append(1)
214  self.nfail.append(0)
215  self.stat.append('PASSED')
216 
217  os.chdir(startDir)
218  endtime='date %s' %time.asctime()
219  tottime='%s-%s'%(endtime,startime)
220 
221 
222 
223 
224  logStat=''
225  for i,s in enumerate(self.stat):
226  logStat+='Step%d-%s '%(i,s)
227  #self.report='%s_%s+%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,'+'.join(self.wf.stepList),logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
228  self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
229 
230  return
231 
def __init__(self, wf, noRun=False, dryRun=False, cafVeto=True, dasOptions="", jobReport=False, nThreads=1, nStreams=0, maxSteps=9999, nEvents=0)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
static std::string join(char **cmd)
Definition: RemoteFile.cc:21
recoOutput
relying on the syntax: cmsDriver -s STEPS –otherFlags
npass
needs to set self.report
#define str(s)