CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Public Attributes
WorkFlowRunner.WorkFlowRunner Class Reference
Inheritance diagram for WorkFlowRunner.WorkFlowRunner:

Public Member Functions

def __init__
 
def doCmd
 
def run
 

Public Attributes

 cafVeto
 
 dasOptions
 
 dryRun
 
 jobReport
 
 nfail
 
 noRun
 
 npass
 needs to set self.report More...
 
 nThreads
 
 report
 wrap up #### More...
 
 retStep
 
 stat
 
 status
 
 wf
 
 wfDir
 

Detailed Description

Definition at line 89 of file WorkFlowRunner.py.

Constructor & Destructor Documentation

def WorkFlowRunner.WorkFlowRunner.__init__ (   self,
  wf,
  noRun = False,
  dryRun = False,
  cafVeto = True,
  dasOptions = "",
  jobReport = False,
  nThreads = 1 
)

Definition at line 90 of file WorkFlowRunner.py.

90 
91  def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1):
92  Thread.__init__(self)
93  self.wf = wf
94 
95  self.status=-1
96  self.report=''
97  self.nfail=0
98  self.npass=0
99  self.noRun=noRun
100  self.dryRun=dryRun
101  self.cafVeto=cafVeto
102  self.dasOptions=dasOptions
103  self.jobReport=jobReport
104  self.nThreads=nThreads
105 
106  self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
107  return
npass
needs to set self.report

Member Function Documentation

def WorkFlowRunner.WorkFlowRunner.doCmd (   self,
  cmd 
)

Definition at line 108 of file WorkFlowRunner.py.

References WorkFlowRunner.WorkFlowRunner.dryRun, and WorkFlowRunner.WorkFlowRunner.wfDir.

109  def doCmd(self, cmd):
110 
111  msg = "\n# in: " +os.getcwd()
112  if self.dryRun: msg += " dryRun for '"
113  else: msg += " going to execute "
114  msg += cmd.replace(';','\n')
115  print msg
116 
117  cmdLog = open(self.wfDir+'/cmdLog','a')
118  cmdLog.write(msg+'\n')
119  cmdLog.close()
120 
121  ret = 0
122  if not self.dryRun:
123  p = Popen(cmd, shell=True)
124  ret = os.waitpid(p.pid, 0)[1]
125  if ret != 0:
126  print "ERROR executing ",cmd,'ret=', ret
127 
128  return ret
def WorkFlowRunner.WorkFlowRunner.run (   self)

Definition at line 129 of file WorkFlowRunner.py.

References WorkFlowRunner.WorkFlowRunner.dryRun, runall.testit.nfail, WorkFlowRunner.WorkFlowRunner.nfail, runall.testit.npass, TShapeAnalysis.npass, WorkFlowRunner.WorkFlowRunner.npass, and WorkFlowRunner.WorkFlowRunner.wfDir.

Referenced by Types.EventID.cppID(), and Types.LuminosityBlockID.cppID().

130  def run(self):
131 
132  startDir = os.getcwd()
133 
134  if not os.path.exists(self.wfDir):
135  os.makedirs(self.wfDir)
136  elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
137  print "cleaning up ", self.wfDir, ' in ', os.getcwd()
138  shutil.rmtree(self.wfDir)
139  os.makedirs(self.wfDir)
140 
141  preamble = 'cd '+self.wfDir+'; '
142 
143  realstarttime = datetime.now()
144  startime='date %s' %time.asctime()
145 
146  # check where we are running:
147  onCAF = False
148  if 'cms/caf/cms' in os.environ['CMS_PATH']:
149  onCAF = True
150 
151  ##needs to set
152  #self.report
153  self.npass = []
154  self.nfail = []
155  self.stat = []
156  self.retStep = []
157 
158  def closeCmd(i,ID):
159  return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
160 
161  inFile=None
162  lumiRangeFile=None
163  aborted=False
164  for (istepmone,com) in enumerate(self.wf.cmds):
165  # isInputOk is used to keep track of the das result. In case this
166  # is False we use a different error message to indicate the failed
167  # das query.
168  isInputOk=True
169  istep=istepmone+1
170  cmd = preamble
171  if aborted:
172  self.npass.append(0)
173  self.nfail.append(0)
174  self.retStep.append(0)
175  self.stat.append('NOTRUN')
176  continue
177  if not isinstance(com,str):
178  if self.cafVeto and (com.location == 'CAF' and not onCAF):
179  print "You need to be no CAF to run",self.wf.numId
180  self.npass.append(0)
181  self.nfail.append(0)
182  self.retStep.append(0)
183  self.stat.append('NOTRUN')
184  aborted=True
185  continue
186  #create lumiRange file first so if das fails we get its error code
187  cmd2 = com.lumiRanges()
188  if cmd2:
189  cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
190  lumiRangeFile='step%d_lumiRanges.log'%(istep,)
191  retStep = self.doCmd(cmd2)
192  cmd+=com.das(self.dasOptions)
193  cmd+=closeCmd(istep,'dasquery')
194  retStep = self.doCmd(cmd)
195  #don't use the file list executed, but use the das command of cmsDriver for next step
196  # If the das output is not there or it's empty, consider it an
197  # issue of this step, not of the next one.
198  dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
199  if not exists(dasOutputPath):
200  retStep = 1
201  dasOutput = None
202  else:
203  # We consider only the files which have at least one logical filename
204  # in it. This is because sometimes das fails and still prints out junk.
205  dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
206  if not dasOutput:
207  retStep = 1
208  isInputOk = False
209 
210  inFile = 'filelist:' + basename(dasOutputPath)
211  print "---"
212  else:
213  #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
214  cmd += com
215  if self.noRun:
216  cmd +=' --no_exec'
217  if inFile: #in case previous step used DAS query (either filelist of das:)
218  cmd += ' --filein '+inFile
219  inFile=None
220  if lumiRangeFile: #DAS query can also restrict lumi range
221  cmd += ' --lumiToProcess '+lumiRangeFile
222  lumiRangeFile=None
223  # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..
224  if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
225  cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
226  else:
227  if istep!=1 and not '--filein' in cmd:
228  cmd+=' --filein file:step%s.root '%(istep-1,)
229  if not '--fileout' in com:
230  cmd+=' --fileout file:step%s.root '%(istep,)
231  if self.jobReport:
232  cmd += ' --suffix "-j JobReport%s.xml " ' % istep
233  if (self.nThreads > 1) and ('HARVESTING' not in cmd) :
234  cmd += ' --nThreads %s' % self.nThreads
235  cmd+=closeCmd(istep,self.wf.nameId)
236 
237  esReportWorkflow(workflow=self.wf.nameId,
238  release=getenv("CMSSW_VERSION"),
239  architecture=getenv("SCRAM_ARCH"),
240  step=istep,
241  command=cmd,
242  status="STARTED",
243  start_time=realstarttime.isoformat(),
244  workflow_id=self.wf.numId)
245  retStep = self.doCmd(cmd)
246 
247 
248 
249  self.retStep.append(retStep)
250  if retStep == 32000:
251  # A timeout occurred
252  self.npass.append(0)
253  self.nfail.append(1)
254  self.stat.append('TIMEOUT')
255  aborted = True
256  elif (retStep!=0):
257  #error occured
258  self.npass.append(0)
259  self.nfail.append(1)
260  if not isInputOk:
261  self.stat.append("DAS_ERROR")
262  else:
263  self.stat.append('FAILED')
264  #to skip processing
265  aborted=True
266  else:
267  #things went fine
268  self.npass.append(1)
269  self.nfail.append(0)
270  self.stat.append('PASSED')
271 
272  esReportWorkflow(workflow=self.wf.nameId,
273  release=getenv("CMSSW_VERSION"),
274  architecture=getenv("SCRAM_ARCH"),
275  step=istep,
276  command=cmd,
277  status=self.stat[-1],
278  start_time=realstarttime.isoformat(),
279  end_time=datetime.now().isoformat(),
280  delta_time=(datetime.now() - realstarttime).seconds,
281  workflow_id=self.wf.numId,
282  log_file="%s/step%d_%s.log" % (self.wfDir, istep, self.wf.nameId))
283 
284  os.chdir(startDir)
285 
286  endtime='date %s' %time.asctime()
287  tottime='%s-%s'%(endtime,startime)
288 
289 
290  #### wrap up ####
291 
292  logStat=''
293  for i,s in enumerate(self.stat):
294  logStat+='Step%d-%s '%(i,s)
295  self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
296 
297  return
298 
299 
300 
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
npass
needs to set self.report
double split
Definition: MVATrainer.cc:139

Member Data Documentation

WorkFlowRunner.WorkFlowRunner.cafVeto

Definition at line 100 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.dasOptions

Definition at line 101 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.dryRun

Definition at line 99 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.doCmd(), and WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.jobReport

Definition at line 102 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.nfail

Definition at line 96 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.noRun

Definition at line 98 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.npass

needs to set self.report

Definition at line 97 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.nThreads

Definition at line 103 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.report

wrap up ####

Definition at line 95 of file WorkFlowRunner.py.

Referenced by dataset.Dataset.getPrimaryDatasetEntries(), and addOnTests.testit.run().

WorkFlowRunner.WorkFlowRunner.retStep

Definition at line 155 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.stat

Definition at line 154 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.status

Definition at line 94 of file WorkFlowRunner.py.

Referenced by dirstructure.Comparison.__make_image(), and dirstructure.Comparison.__repr__().

WorkFlowRunner.WorkFlowRunner.wf

Definition at line 92 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.wfDir

Definition at line 105 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.doCmd(), and WorkFlowRunner.WorkFlowRunner.run().