CMS 3D CMS Logo

List of all members | Public Member Functions | Public Attributes
WorkFlowRunner.WorkFlowRunner Class Reference
Inheritance diagram for WorkFlowRunner.WorkFlowRunner:

Public Member Functions

def __init__ (self, wf, noRun=False, dryRun=False, cafVeto=True, dasOptions="", jobReport=False, nThreads=1, maxSteps=9999)
 
def doCmd (self, cmd)
 
def run (self)
 

Public Attributes

 cafVeto
 
 dasOptions
 
 dryRun
 
 jobReport
 
 maxSteps
 
 nfail
 
 noRun
 
 npass
 needs to set self.report More...
 
 nThreads
 
 report
 wrap up #### More...
 
 retStep
 
 stat
 
 status
 
 wf
 
 wfDir
 

Detailed Description

Definition at line 9 of file WorkFlowRunner.py.

Constructor & Destructor Documentation

def WorkFlowRunner.WorkFlowRunner.__init__ (   self,
  wf,
  noRun = False,
  dryRun = False,
  cafVeto = True,
  dasOptions = "",
  jobReport = False,
  nThreads = 1,
  maxSteps = 9999 
)

Definition at line 10 of file WorkFlowRunner.py.

10  def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, maxSteps=9999):
11  Thread.__init__(self)
12  self.wf = wf
13 
14  self.status=-1
15  self.report=''
16  self.nfail=0
17  self.npass=0
18  self.noRun=noRun
19  self.dryRun=dryRun
20  self.cafVeto=cafVeto
21  self.dasOptions=dasOptions
22  self.jobReport=jobReport
23  self.nThreads=nThreads
24  self.maxSteps = maxSteps
25 
26  self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
27  return
28 
def __init__(self, wf, noRun=False, dryRun=False, cafVeto=True, dasOptions="", jobReport=False, nThreads=1, maxSteps=9999)
npass
needs to set self.report

Member Function Documentation

def WorkFlowRunner.WorkFlowRunner.doCmd (   self,
  cmd 
)

Definition at line 29 of file WorkFlowRunner.py.

References WorkFlowRunner.WorkFlowRunner.dryRun, and WorkFlowRunner.WorkFlowRunner.wfDir.

29  def doCmd(self, cmd):
30 
31  msg = "\n# in: " +os.getcwd()
32  if self.dryRun: msg += " dryRun for '"
33  else: msg += " going to execute "
34  msg += cmd.replace(';','\n')
35  print msg
36 
37  cmdLog = open(self.wfDir+'/cmdLog','a')
38  cmdLog.write(msg+'\n')
39  cmdLog.close()
40 
41  ret = 0
42  if not self.dryRun:
43  p = Popen(cmd, shell=True)
44  ret = os.waitpid(p.pid, 0)[1]
45  if ret != 0:
46  print "ERROR executing ",cmd,'ret=', ret
47 
48  return ret
49 
def WorkFlowRunner.WorkFlowRunner.run (   self)

Definition at line 50 of file WorkFlowRunner.py.

References WorkFlowRunner.WorkFlowRunner.dryRun, runall.testit.nfail, WorkFlowRunner.WorkFlowRunner.nfail, runall.testit.npass, WorkFlowRunner.WorkFlowRunner.npass, TShapeAnalysis.npass, and WorkFlowRunner.WorkFlowRunner.wfDir.

Referenced by Types.EventID.cppID(), and Types.LuminosityBlockID.cppID().

50  def run(self):
51 
52  startDir = os.getcwd()
53 
54  if not os.path.exists(self.wfDir):
55  os.makedirs(self.wfDir)
56  elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
57  print "cleaning up ", self.wfDir, ' in ', os.getcwd()
58  shutil.rmtree(self.wfDir)
59  os.makedirs(self.wfDir)
60 
61  preamble = 'cd '+self.wfDir+'; '
62 
63  realstarttime = datetime.now()
64  startime='date %s' %time.asctime()
65 
66  # check where we are running:
67  onCAF = False
68  if 'cms/caf/cms' in os.environ['CMS_PATH']:
69  onCAF = True
70 
71  ##needs to set
72  #self.report
73  self.npass = []
74  self.nfail = []
75  self.stat = []
76  self.retStep = []
77 
78  def closeCmd(i,ID):
79  return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
80 
81  inFile=None
82  lumiRangeFile=None
83  aborted=False
84  for (istepmone,com) in enumerate(self.wf.cmds):
85  # isInputOk is used to keep track of the das result. In case this
86  # is False we use a different error message to indicate the failed
87  # das query.
88  isInputOk=True
89  istep=istepmone+1
90  cmd = preamble
91  if aborted:
92  self.npass.append(0)
93  self.nfail.append(0)
94  self.retStep.append(0)
95  self.stat.append('NOTRUN')
96  continue
97  if not isinstance(com,str):
98  if self.cafVeto and (com.location == 'CAF' and not onCAF):
99  print "You need to be no CAF to run",self.wf.numId
100  self.npass.append(0)
101  self.nfail.append(0)
102  self.retStep.append(0)
103  self.stat.append('NOTRUN')
104  aborted=True
105  continue
106  #create lumiRange file first so if das fails we get its error code
107  cmd2 = com.lumiRanges()
108  if cmd2:
109  cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
110  lumiRangeFile='step%d_lumiRanges.log'%(istep,)
111  retStep = self.doCmd(cmd2)
112  if (com.dataSetParent):
113  cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
114  retStep = self.doCmd(cmd3)
115  cmd+=com.das(self.dasOptions,com.dataSet)
116  cmd+=closeCmd(istep,'dasquery')
117  retStep = self.doCmd(cmd)
118  #don't use the file list executed, but use the das command of cmsDriver for next step
119  # If the das output is not there or it's empty, consider it an
120  # issue of this step, not of the next one.
121  dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
122  if not exists(dasOutputPath):
123  retStep = 1
124  dasOutput = None
125  else:
126  # We consider only the files which have at least one logical filename
127  # in it. This is because sometimes das fails and still prints out junk.
128  dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
129  if not dasOutput:
130  retStep = 1
131  isInputOk = False
132 
133  inFile = 'filelist:' + basename(dasOutputPath)
134  print "---"
135  else:
136  #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
137  cmd += com
138  if self.noRun:
139  cmd +=' --no_exec'
140  if inFile: #in case previous step used DAS query (either filelist of das:)
141  cmd += ' --filein '+inFile
142  inFile=None
143  if lumiRangeFile: #DAS query can also restrict lumi range
144  cmd += ' --lumiToProcess '+lumiRangeFile
145  lumiRangeFile=None
146  # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..
147  if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
148  cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
149  else:
150  if istep!=1 and not '--filein' in cmd:
151  cmd+=' --filein file:step%s.root '%(istep-1,)
152  if not '--fileout' in com:
153  cmd+=' --fileout file:step%s.root '%(istep,)
154  if self.jobReport:
155  cmd += ' --suffix "-j JobReport%s.xml " ' % istep
156  if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
157  cmd += ' --nThreads %s' % self.nThreads
158  cmd+=closeCmd(istep,self.wf.nameId)
159  retStep = 0
160  if istep>self.maxSteps:
161  wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
162  wf_stats.write('step%s:%s\n' % (istep, cmd))
163  wf_stats.close()
164  else: retStep = self.doCmd(cmd)
165 
166  self.retStep.append(retStep)
167  if retStep == 32000:
168  # A timeout occurred
169  self.npass.append(0)
170  self.nfail.append(1)
171  self.stat.append('TIMEOUT')
172  aborted = True
173  elif (retStep!=0):
174  #error occured
175  self.npass.append(0)
176  self.nfail.append(1)
177  if not isInputOk:
178  self.stat.append("DAS_ERROR")
179  else:
180  self.stat.append('FAILED')
181  #to skip processing
182  aborted=True
183  else:
184  #things went fine
185  self.npass.append(1)
186  self.nfail.append(0)
187  self.stat.append('PASSED')
188 
189  os.chdir(startDir)
190  endtime='date %s' %time.asctime()
191  tottime='%s-%s'%(endtime,startime)
192 
193 
194  #### wrap up ####
195 
196  logStat=''
197  for i,s in enumerate(self.stat):
198  logStat+='Step%d-%s '%(i,s)
199  self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
200 
201  return
202 

Member Data Documentation

WorkFlowRunner.WorkFlowRunner.cafVeto

Definition at line 20 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.dasOptions

Definition at line 21 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.dryRun
WorkFlowRunner.WorkFlowRunner.jobReport

Definition at line 22 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.maxSteps

Definition at line 24 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.nfail

Definition at line 16 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.noRun

Definition at line 18 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.npass

needs to set self.report

Definition at line 17 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.nThreads

Definition at line 23 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.report

wrap up ####

Definition at line 15 of file WorkFlowRunner.py.

Referenced by dataset.Dataset.getPrimaryDatasetEntries(), and addOnTests.testit.run().

WorkFlowRunner.WorkFlowRunner.retStep

Definition at line 76 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.stat

Definition at line 75 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.status
WorkFlowRunner.WorkFlowRunner.wf

Definition at line 12 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.wfDir