CMS 3D CMS Logo

List of all members | Public Member Functions | Public Attributes
WorkFlowRunner.WorkFlowRunner Class Reference
Inheritance diagram for WorkFlowRunner.WorkFlowRunner:

Public Member Functions

def __init__ (self, wf, noRun=False, dryRun=False, cafVeto=True, dasOptions="", jobReport=False, nThreads=1, maxSteps=9999)
 
def doCmd (self, cmd)
 
def run (self)
 

Public Attributes

 cafVeto
 
 dasOptions
 
 dryRun
 
 jobReport
 
 maxSteps
 
 nfail
 
 noRun
 
 npass
 needs to set self.report More...
 
 nThreads
 
 report
 wrap up #### More...
 
 retStep
 
 stat
 
 status
 
 wf
 
 wfDir
 

Detailed Description

Definition at line 9 of file WorkFlowRunner.py.

Constructor & Destructor Documentation

def WorkFlowRunner.WorkFlowRunner.__init__ (   self,
  wf,
  noRun = False,
  dryRun = False,
  cafVeto = True,
  dasOptions = "",
  jobReport = False,
  nThreads = 1,
  maxSteps = 9999 
)

Definition at line 10 of file WorkFlowRunner.py.

10  def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, maxSteps=9999):
11  Thread.__init__(self)
12  self.wf = wf
13 
14  self.status=-1
15  self.report=''
16  self.nfail=0
17  self.npass=0
18  self.noRun=noRun
19  self.dryRun=dryRun
20  self.cafVeto=cafVeto
21  self.dasOptions=dasOptions
22  self.jobReport=jobReport
23  self.nThreads=nThreads
24  self.maxSteps = maxSteps
25 
26  self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
27  return
28 
def __init__(self, wf, noRun=False, dryRun=False, cafVeto=True, dasOptions="", jobReport=False, nThreads=1, maxSteps=9999)
npass
needs to set self.report

Member Function Documentation

def WorkFlowRunner.WorkFlowRunner.doCmd (   self,
  cmd 
)

Definition at line 29 of file WorkFlowRunner.py.

References WorkFlowRunner.WorkFlowRunner.dryRun, and WorkFlowRunner.WorkFlowRunner.wfDir.

29  def doCmd(self, cmd):
30 
31  msg = "\n# in: " +os.getcwd()
32  if self.dryRun: msg += " dryRun for '"
33  else: msg += " going to execute "
34  msg += cmd.replace(';','\n')
35  print msg
36 
37  cmdLog = open(self.wfDir+'/cmdLog','a')
38  cmdLog.write(msg+'\n')
39  cmdLog.close()
40 
41  ret = 0
42  if not self.dryRun:
43  p = Popen(cmd, shell=True)
44  ret = os.waitpid(p.pid, 0)[1]
45  if ret != 0:
46  print "ERROR executing ",cmd,'ret=', ret
47 
48  return ret
49 
def WorkFlowRunner.WorkFlowRunner.run (   self)

Definition at line 50 of file WorkFlowRunner.py.

References WorkFlowRunner.WorkFlowRunner.dryRun, runall.testit.nfail, WorkFlowRunner.WorkFlowRunner.nfail, runall.testit.npass, WorkFlowRunner.WorkFlowRunner.npass, TShapeAnalysis.npass, and WorkFlowRunner.WorkFlowRunner.wfDir.

Referenced by Types.EventID.cppID(), and Types.LuminosityBlockID.cppID().

50  def run(self):
51 
52  startDir = os.getcwd()
53 
54  if not os.path.exists(self.wfDir):
55  os.makedirs(self.wfDir)
56  elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
57  print "cleaning up ", self.wfDir, ' in ', os.getcwd()
58  shutil.rmtree(self.wfDir)
59  os.makedirs(self.wfDir)
60 
61  preamble = 'cd '+self.wfDir+'; '
62 
63  realstarttime = datetime.now()
64  startime='date %s' %time.asctime()
65 
66  # check where we are running:
67  onCAF = False
68  if 'cms/caf/cms' in os.environ['CMS_PATH']:
69  onCAF = True
70 
71  ##needs to set
72  #self.report
73  self.npass = []
74  self.nfail = []
75  self.stat = []
76  self.retStep = []
77 
78  def closeCmd(i,ID):
79  return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
80 
81  inFile=None
82  lumiRangeFile=None
83  aborted=False
84  for (istepmone,com) in enumerate(self.wf.cmds):
85  # isInputOk is used to keep track of the das result. In case this
86  # is False we use a different error message to indicate the failed
87  # das query.
88  isInputOk=True
89  istep=istepmone+1
90  cmd = preamble
91  if aborted:
92  self.npass.append(0)
93  self.nfail.append(0)
94  self.retStep.append(0)
95  self.stat.append('NOTRUN')
96  continue
97  if not isinstance(com,str):
98  if self.cafVeto and (com.location == 'CAF' and not onCAF):
99  print "You need to be no CAF to run",self.wf.numId
100  self.npass.append(0)
101  self.nfail.append(0)
102  self.retStep.append(0)
103  self.stat.append('NOTRUN')
104  aborted=True
105  continue
106  #create lumiRange file first so if das fails we get its error code
107  cmd2 = com.lumiRanges()
108  if cmd2:
109  cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
110  lumiRangeFile='step%d_lumiRanges.log'%(istep,)
111  retStep = self.doCmd(cmd2)
112  cmd+=com.das(self.dasOptions)
113  cmd+=closeCmd(istep,'dasquery')
114  retStep = self.doCmd(cmd)
115  #don't use the file list executed, but use the das command of cmsDriver for next step
116  # If the das output is not there or it's empty, consider it an
117  # issue of this step, not of the next one.
118  dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
119  if not exists(dasOutputPath):
120  retStep = 1
121  dasOutput = None
122  else:
123  # We consider only the files which have at least one logical filename
124  # in it. This is because sometimes das fails and still prints out junk.
125  dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
126  if not dasOutput:
127  retStep = 1
128  isInputOk = False
129 
130  inFile = 'filelist:' + basename(dasOutputPath)
131  print "---"
132  else:
133  #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
134  cmd += com
135  if self.noRun:
136  cmd +=' --no_exec'
137  if inFile: #in case previous step used DAS query (either filelist of das:)
138  cmd += ' --filein '+inFile
139  inFile=None
140  if lumiRangeFile: #DAS query can also restrict lumi range
141  cmd += ' --lumiToProcess '+lumiRangeFile
142  lumiRangeFile=None
143  # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..
144  if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
145  cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
146  else:
147  if istep!=1 and not '--filein' in cmd:
148  cmd+=' --filein file:step%s.root '%(istep-1,)
149  if not '--fileout' in com:
150  cmd+=' --fileout file:step%s.root '%(istep,)
151  if self.jobReport:
152  cmd += ' --suffix "-j JobReport%s.xml " ' % istep
153  if (self.nThreads > 1) and ('HARVESTING' not in cmd) :
154  cmd += ' --nThreads %s' % self.nThreads
155  cmd+=closeCmd(istep,self.wf.nameId)
156  retStep = 0
157  if istep>self.maxSteps:
158  wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
159  wf_stats.write('step%s:%s\n' % (istep, cmd))
160  wf_stats.close()
161  else: retStep = self.doCmd(cmd)
162 
163  self.retStep.append(retStep)
164  if retStep == 32000:
165  # A timeout occurred
166  self.npass.append(0)
167  self.nfail.append(1)
168  self.stat.append('TIMEOUT')
169  aborted = True
170  elif (retStep!=0):
171  #error occured
172  self.npass.append(0)
173  self.nfail.append(1)
174  if not isInputOk:
175  self.stat.append("DAS_ERROR")
176  else:
177  self.stat.append('FAILED')
178  #to skip processing
179  aborted=True
180  else:
181  #things went fine
182  self.npass.append(1)
183  self.nfail.append(0)
184  self.stat.append('PASSED')
185 
186  os.chdir(startDir)
187  endtime='date %s' %time.asctime()
188  tottime='%s-%s'%(endtime,startime)
189 
190 
191  #### wrap up ####
192 
193  logStat=''
194  for i,s in enumerate(self.stat):
195  logStat+='Step%d-%s '%(i,s)
196  self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
197 
198  return
199 

Member Data Documentation

WorkFlowRunner.WorkFlowRunner.cafVeto

Definition at line 20 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.dasOptions

Definition at line 21 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.dryRun
WorkFlowRunner.WorkFlowRunner.jobReport

Definition at line 22 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.maxSteps

Definition at line 24 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.nfail

Definition at line 16 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.noRun

Definition at line 18 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.npass

needs to set self.report

Definition at line 17 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.nThreads

Definition at line 23 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.report

wrap up ####

Definition at line 15 of file WorkFlowRunner.py.

Referenced by dataset.Dataset.getPrimaryDatasetEntries(), and addOnTests.testit.run().

WorkFlowRunner.WorkFlowRunner.retStep

Definition at line 76 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.stat

Definition at line 75 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.status
WorkFlowRunner.WorkFlowRunner.wf

Definition at line 12 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.wfDir