CMS 3D CMS Logo

List of all members | Public Member Functions | Public Attributes
WorkFlowRunner.WorkFlowRunner Class Reference
Inheritance diagram for WorkFlowRunner.WorkFlowRunner:

Public Member Functions

def __init__ (self, wf, noRun=False, dryRun=False, cafVeto=True, dasOptions="", jobReport=False, nThreads=1, maxSteps=9999)
 
def doCmd (self, cmd)
 
def run (self)
 

Public Attributes

 cafVeto
 
 dasOptions
 
 dryRun
 
 jobReport
 
 maxSteps
 
 nfail
 
 noRun
 
 npass
 needs to set self.report More...
 
 nThreads
 
 report
 wrap up #### More...
 
 retStep
 
 stat
 
 status
 
 wf
 
 wfDir
 

Detailed Description

Definition at line 9 of file WorkFlowRunner.py.

Constructor & Destructor Documentation

def WorkFlowRunner.WorkFlowRunner.__init__ (   self,
  wf,
  noRun = False,
  dryRun = False,
  cafVeto = True,
  dasOptions = "",
  jobReport = False,
  nThreads = 1,
  maxSteps = 9999 
)

Definition at line 10 of file WorkFlowRunner.py.

10  def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, maxSteps=9999):
11  Thread.__init__(self)
12  self.wf = wf
13 
14  self.status=-1
15  self.report=''
16  self.nfail=0
17  self.npass=0
18  self.noRun=noRun
19  self.dryRun=dryRun
20  self.cafVeto=cafVeto
21  self.dasOptions=dasOptions
22  self.jobReport=jobReport
23  self.nThreads=nThreads
24  self.maxSteps = maxSteps
25 
26  self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
27  return
28 
def __init__(self, wf, noRun=False, dryRun=False, cafVeto=True, dasOptions="", jobReport=False, nThreads=1, maxSteps=9999)
npass
needs to set self.report
#define str(s)

Member Function Documentation

def WorkFlowRunner.WorkFlowRunner.doCmd (   self,
  cmd 
)

Definition at line 29 of file WorkFlowRunner.py.

References WorkFlowRunner.WorkFlowRunner.dryRun, and WorkFlowRunner.WorkFlowRunner.wfDir.

29  def doCmd(self, cmd):
30 
31  msg = "\n# in: " +os.getcwd()
32  if self.dryRun: msg += " dryRun for '"
33  else: msg += " going to execute "
34  msg += cmd.replace(';','\n')
35  print msg
36 
37  cmdLog = open(self.wfDir+'/cmdLog','a')
38  cmdLog.write(msg+'\n')
39  cmdLog.close()
40 
41  ret = 0
42  if not self.dryRun:
43  p = Popen(cmd, shell=True)
44  ret = os.waitpid(p.pid, 0)[1]
45  if ret != 0:
46  print "ERROR executing ",cmd,'ret=', ret
47 
48  return ret
49 
def WorkFlowRunner.WorkFlowRunner.run (   self)

Definition at line 50 of file WorkFlowRunner.py.

References WorkFlowRunner.WorkFlowRunner.dryRun, runall.testit.nfail, WorkFlowRunner.WorkFlowRunner.nfail, runall.testit.npass, WorkFlowRunner.WorkFlowRunner.npass, TShapeAnalysis.npass, and WorkFlowRunner.WorkFlowRunner.wfDir.

Referenced by Types.EventID.cppID(), and Types.LuminosityBlockID.cppID().

50  def run(self):
51 
52  startDir = os.getcwd()
53 
54  if not os.path.exists(self.wfDir):
55  os.makedirs(self.wfDir)
56  elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
57  print "cleaning up ", self.wfDir, ' in ', os.getcwd()
58  shutil.rmtree(self.wfDir)
59  os.makedirs(self.wfDir)
60 
61  preamble = 'cd '+self.wfDir+'; '
62 
63  realstarttime = datetime.now()
64  startime='date %s' %time.asctime()
65 
66  # check where we are running:
67  onCAF = False
68  if 'cms/caf/cms' in os.environ['CMS_PATH']:
69  onCAF = True
70 
71  ##needs to set
72  #self.report
73  self.npass = []
74  self.nfail = []
75  self.stat = []
76  self.retStep = []
77 
78  def closeCmd(i,ID):
79  return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
80 
81  inFile=None
82  lumiRangeFile=None
83  aborted=False
84  for (istepmone,com) in enumerate(self.wf.cmds):
85  # isInputOk is used to keep track of the das result. In case this
86  # is False we use a different error message to indicate the failed
87  # das query.
88  isInputOk=True
89  istep=istepmone+1
90  cmd = preamble
91  if aborted:
92  self.npass.append(0)
93  self.nfail.append(0)
94  self.retStep.append(0)
95  self.stat.append('NOTRUN')
96  continue
97  if not isinstance(com,str):
98  if self.cafVeto and (com.location == 'CAF' and not onCAF):
99  print "You need to be no CAF to run",self.wf.numId
100  self.npass.append(0)
101  self.nfail.append(0)
102  self.retStep.append(0)
103  self.stat.append('NOTRUN')
104  aborted=True
105  continue
106  #create lumiRange file first so if das fails we get its error code
107  cmd2 = com.lumiRanges()
108  if cmd2:
109  cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
110  lumiRangeFile='step%d_lumiRanges.log'%(istep,)
111  retStep = self.doCmd(cmd2)
112  if (com.dataSetParent):
113  cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
114  retStep = self.doCmd(cmd3)
115  cmd+=com.das(self.dasOptions,com.dataSet)
116  cmd+=closeCmd(istep,'dasquery')
117  retStep = self.doCmd(cmd)
118  #don't use the file list executed, but use the das command of cmsDriver for next step
119  # If the das output is not there or it's empty, consider it an
120  # issue of this step, not of the next one.
121  dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
122  if not exists(dasOutputPath):
123  retStep = 1
124  dasOutput = None
125  else:
126  # We consider only the files which have at least one logical filename
127  # in it. This is because sometimes das fails and still prints out junk.
128  dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
129  if not dasOutput:
130  retStep = 1
131  isInputOk = False
132 
133  inFile = 'filelist:' + basename(dasOutputPath)
134  print "---"
135  else:
136  #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
137  cmd += com
138  if self.noRun:
139  cmd +=' --no_exec'
140  # in case previous step used DAS query (either filelist of das:)
141  # not to be applied for premixing stage1 to allow combiend stage1+stage2 workflow
142  if inFile and not 'premix_stage1' in cmd:
143  cmd += ' --filein '+inFile
144  inFile=None
145  if lumiRangeFile: #DAS query can also restrict lumi range
146  cmd += ' --lumiToProcess '+lumiRangeFile
147  lumiRangeFile=None
148  # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..
149  if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
150  cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
151  else:
152  # Disable input for premix stage1 to allow combined stage1+stage2 workflow
153  # Bit of a hack but works
154  if istep!=1 and not '--filein' in cmd and not 'premix_stage1' in cmd:
155  cmd+=' --filein file:step%s.root '%(istep-1,)
156  if not '--fileout' in com:
157  cmd+=' --fileout file:step%s.root '%(istep,)
158  if self.jobReport:
159  cmd += ' --suffix "-j JobReport%s.xml " ' % istep
160  if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
161  cmd += ' --nThreads %s' % self.nThreads
162  cmd+=closeCmd(istep,self.wf.nameId)
163  retStep = 0
164  if istep>self.maxSteps:
165  wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
166  wf_stats.write('step%s:%s\n' % (istep, cmd))
167  wf_stats.close()
168  else: retStep = self.doCmd(cmd)
169 
170  self.retStep.append(retStep)
171  if retStep == 32000:
172  # A timeout occurred
173  self.npass.append(0)
174  self.nfail.append(1)
175  self.stat.append('TIMEOUT')
176  aborted = True
177  elif (retStep!=0):
178  #error occured
179  self.npass.append(0)
180  self.nfail.append(1)
181  if not isInputOk:
182  self.stat.append("DAS_ERROR")
183  else:
184  self.stat.append('FAILED')
185  #to skip processing
186  aborted=True
187  else:
188  #things went fine
189  self.npass.append(1)
190  self.nfail.append(0)
191  self.stat.append('PASSED')
192 
193  os.chdir(startDir)
194  endtime='date %s' %time.asctime()
195  tottime='%s-%s'%(endtime,startime)
196 
197 
198  #### wrap up ####
199 
200  logStat=''
201  for i,s in enumerate(self.stat):
202  logStat+='Step%d-%s '%(i,s)
203  self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
204 
205  return
206 

Member Data Documentation

WorkFlowRunner.WorkFlowRunner.cafVeto

Definition at line 20 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.dasOptions

Definition at line 21 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.dryRun
WorkFlowRunner.WorkFlowRunner.jobReport

Definition at line 22 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.maxSteps

Definition at line 24 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.nfail

Definition at line 16 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.noRun

Definition at line 18 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.npass

needs to set self.report

Definition at line 17 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.nThreads

Definition at line 23 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.report

wrap up ####

Definition at line 15 of file WorkFlowRunner.py.

Referenced by dataset.Dataset.getPrimaryDatasetEntries(), and addOnTests.testit.run().

WorkFlowRunner.WorkFlowRunner.retStep

Definition at line 76 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.stat

Definition at line 75 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.status
WorkFlowRunner.WorkFlowRunner.wf

Definition at line 12 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.wfDir