CMS 3D CMS Logo

List of all members | Public Member Functions | Public Attributes
WorkFlowRunner.WorkFlowRunner Class Reference
Inheritance diagram for WorkFlowRunner.WorkFlowRunner:

Public Member Functions

def __init__ (self, wf, noRun=False, dryRun=False, cafVeto=True, dasOptions="", jobReport=False, nThreads=1)
 
def doCmd (self, cmd)
 
def run (self)
 

Public Attributes

 cafVeto
 
 dasOptions
 
 dryRun
 
 jobReport
 
 nfail
 
 noRun
 
 npass
 needs to set self.report More...
 
 nThreads
 
 report
 wrap up #### More...
 
 retStep
 
 stat
 
 status
 
 wf
 
 wfDir
 

Detailed Description

Definition at line 89 of file WorkFlowRunner.py.

Constructor & Destructor Documentation

def WorkFlowRunner.WorkFlowRunner.__init__ (   self,
  wf,
  noRun = False,
  dryRun = False,
  cafVeto = True,
  dasOptions = "",
  jobReport = False,
  nThreads = 1 
)

Definition at line 90 of file WorkFlowRunner.py.

90  def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1):
91  Thread.__init__(self)
92  self.wf = wf
93 
94  self.status=-1
95  self.report=''
96  self.nfail=0
97  self.npass=0
98  self.noRun=noRun
99  self.dryRun=dryRun
100  self.cafVeto=cafVeto
101  self.dasOptions=dasOptions
102  self.jobReport=jobReport
103  self.nThreads=nThreads
104 
105  self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
106  return
107 
def __init__(self, wf, noRun=False, dryRun=False, cafVeto=True, dasOptions="", jobReport=False, nThreads=1)
npass
needs to set self.report

Member Function Documentation

def WorkFlowRunner.WorkFlowRunner.doCmd (   self,
  cmd 
)

Definition at line 108 of file WorkFlowRunner.py.

References WorkFlowRunner.WorkFlowRunner.dryRun, and WorkFlowRunner.WorkFlowRunner.wfDir.

108  def doCmd(self, cmd):
109 
110  msg = "\n# in: " +os.getcwd()
111  if self.dryRun: msg += " dryRun for '"
112  else: msg += " going to execute "
113  msg += cmd.replace(';','\n')
114  print msg
115 
116  cmdLog = open(self.wfDir+'/cmdLog','a')
117  cmdLog.write(msg+'\n')
118  cmdLog.close()
119 
120  ret = 0
121  if not self.dryRun:
122  p = Popen(cmd, shell=True)
123  ret = os.waitpid(p.pid, 0)[1]
124  if ret != 0:
125  print "ERROR executing ",cmd,'ret=', ret
126 
127  return ret
128 
def WorkFlowRunner.WorkFlowRunner.run (   self)

Definition at line 129 of file WorkFlowRunner.py.

References WorkFlowRunner.WorkFlowRunner.dryRun, runall.testit.nfail, WorkFlowRunner.WorkFlowRunner.nfail, runall.testit.npass, TShapeAnalysis.npass, WorkFlowRunner.WorkFlowRunner.npass, and WorkFlowRunner.WorkFlowRunner.wfDir.

Referenced by Types.EventID.cppID(), and Types.LuminosityBlockID.cppID().

129  def run(self):
130 
131  startDir = os.getcwd()
132 
133  if not os.path.exists(self.wfDir):
134  os.makedirs(self.wfDir)
135  elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
136  print "cleaning up ", self.wfDir, ' in ', os.getcwd()
137  shutil.rmtree(self.wfDir)
138  os.makedirs(self.wfDir)
139 
140  preamble = 'cd '+self.wfDir+'; '
141 
142  realstarttime = datetime.now()
143  startime='date %s' %time.asctime()
144 
145  # check where we are running:
146  onCAF = False
147  if 'cms/caf/cms' in os.environ['CMS_PATH']:
148  onCAF = True
149 
150  ##needs to set
151  #self.report
152  self.npass = []
153  self.nfail = []
154  self.stat = []
155  self.retStep = []
156 
157  def closeCmd(i,ID):
158  return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
159 
160  inFile=None
161  lumiRangeFile=None
162  aborted=False
163  for (istepmone,com) in enumerate(self.wf.cmds):
164  # isInputOk is used to keep track of the das result. In case this
165  # is False we use a different error message to indicate the failed
166  # das query.
167  isInputOk=True
168  istep=istepmone+1
169  cmd = preamble
170  if aborted:
171  self.npass.append(0)
172  self.nfail.append(0)
173  self.retStep.append(0)
174  self.stat.append('NOTRUN')
175  continue
176  if not isinstance(com,str):
177  if self.cafVeto and (com.location == 'CAF' and not onCAF):
178  print "You need to be no CAF to run",self.wf.numId
179  self.npass.append(0)
180  self.nfail.append(0)
181  self.retStep.append(0)
182  self.stat.append('NOTRUN')
183  aborted=True
184  continue
185  #create lumiRange file first so if das fails we get its error code
186  cmd2 = com.lumiRanges()
187  if cmd2:
188  cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
189  lumiRangeFile='step%d_lumiRanges.log'%(istep,)
190  retStep = self.doCmd(cmd2)
191  cmd+=com.das(self.dasOptions)
192  cmd+=closeCmd(istep,'dasquery')
193  retStep = self.doCmd(cmd)
194  #don't use the file list executed, but use the das command of cmsDriver for next step
195  # If the das output is not there or it's empty, consider it an
196  # issue of this step, not of the next one.
197  dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
198  if not exists(dasOutputPath):
199  retStep = 1
200  dasOutput = None
201  else:
202  # We consider only the files which have at least one logical filename
203  # in it. This is because sometimes das fails and still prints out junk.
204  dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
205  if not dasOutput:
206  retStep = 1
207  isInputOk = False
208 
209  inFile = 'filelist:' + basename(dasOutputPath)
210  print "---"
211  else:
212  #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
213  cmd += com
214  if self.noRun:
215  cmd +=' --no_exec'
216  if inFile: #in case previous step used DAS query (either filelist of das:)
217  cmd += ' --filein '+inFile
218  inFile=None
219  if lumiRangeFile: #DAS query can also restrict lumi range
220  cmd += ' --lumiToProcess '+lumiRangeFile
221  lumiRangeFile=None
222  # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..
223  if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
224  cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
225  else:
226  if istep!=1 and not '--filein' in cmd:
227  cmd+=' --filein file:step%s.root '%(istep-1,)
228  if not '--fileout' in com:
229  cmd+=' --fileout file:step%s.root '%(istep,)
230  if self.jobReport:
231  cmd += ' --suffix "-j JobReport%s.xml " ' % istep
232  if (self.nThreads > 1) and ('HARVESTING' not in cmd) :
233  cmd += ' --nThreads %s' % self.nThreads
234  cmd+=closeCmd(istep,self.wf.nameId)
235 
236  esReportWorkflow(workflow=self.wf.nameId,
237  release=getenv("CMSSW_VERSION"),
238  architecture=getenv("SCRAM_ARCH"),
239  step=istep,
240  command=cmd,
241  status="STARTED",
242  start_time=realstarttime.isoformat(),
243  workflow_id=self.wf.numId)
244  retStep = self.doCmd(cmd)
245 
246 
247 
248  self.retStep.append(retStep)
249  if retStep == 32000:
250  # A timeout occurred
251  self.npass.append(0)
252  self.nfail.append(1)
253  self.stat.append('TIMEOUT')
254  aborted = True
255  elif (retStep!=0):
256  #error occured
257  self.npass.append(0)
258  self.nfail.append(1)
259  if not isInputOk:
260  self.stat.append("DAS_ERROR")
261  else:
262  self.stat.append('FAILED')
263  #to skip processing
264  aborted=True
265  else:
266  #things went fine
267  self.npass.append(1)
268  self.nfail.append(0)
269  self.stat.append('PASSED')
270 
271  esReportWorkflow(workflow=self.wf.nameId,
272  release=getenv("CMSSW_VERSION"),
273  architecture=getenv("SCRAM_ARCH"),
274  step=istep,
275  command=cmd,
276  status=self.stat[-1],
277  start_time=realstarttime.isoformat(),
278  end_time=datetime.now().isoformat(),
279  delta_time=(datetime.now() - realstarttime).seconds,
280  workflow_id=self.wf.numId,
281  log_file="%s/step%d_%s.log" % (self.wfDir, istep, self.wf.nameId))
282 
283  os.chdir(startDir)
284 
285  endtime='date %s' %time.asctime()
286  tottime='%s-%s'%(endtime,startime)
287 
288 
289  #### wrap up ####
290 
291  logStat=''
292  for i,s in enumerate(self.stat):
293  logStat+='Step%d-%s '%(i,s)
294  self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
295 
296  return
297 
298 
299 

Member Data Documentation

WorkFlowRunner.WorkFlowRunner.cafVeto

Definition at line 100 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.dasOptions

Definition at line 101 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.dryRun
WorkFlowRunner.WorkFlowRunner.jobReport

Definition at line 102 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.nfail

Definition at line 96 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.noRun

Definition at line 98 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.npass

needs to set self.report

Definition at line 97 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.nThreads

Definition at line 103 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.report

wrap up ####

Definition at line 95 of file WorkFlowRunner.py.

Referenced by dataset.Dataset.getPrimaryDatasetEntries(), and addOnTests.testit.run().

WorkFlowRunner.WorkFlowRunner.retStep

Definition at line 155 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.stat

Definition at line 154 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.status
WorkFlowRunner.WorkFlowRunner.wf

Definition at line 92 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.wfDir