CMS 3D CMS Logo

List of all members | Public Member Functions | Public Attributes
WorkFlowRunner.WorkFlowRunner Class Reference
Inheritance diagram for WorkFlowRunner.WorkFlowRunner:

Public Member Functions

def __init__ (self, wf, noRun=False, dryRun=False, cafVeto=True, dasOptions="", jobReport=False, nThreads=1, maxSteps=9999)
 
def doCmd (self, cmd)
 
def run (self)
 

Public Attributes

 cafVeto
 
 dasOptions
 
 dryRun
 
 jobReport
 
 maxSteps
 
 nfail
 
 noRun
 
 npass
 needs to set self.report More...
 
 nThreads
 
 report
 wrap up #### More...
 
 retStep
 
 stat
 
 status
 
 wf
 
 wfDir
 

Detailed Description

Definition at line 10 of file WorkFlowRunner.py.

Constructor & Destructor Documentation

def WorkFlowRunner.WorkFlowRunner.__init__ (   self,
  wf,
  noRun = False,
  dryRun = False,
  cafVeto = True,
  dasOptions = "",
  jobReport = False,
  nThreads = 1,
  maxSteps = 9999 
)

Definition at line 11 of file WorkFlowRunner.py.

11  def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, maxSteps=9999):
12  Thread.__init__(self)
13  self.wf = wf
14 
15  self.status=-1
16  self.report=''
17  self.nfail=0
18  self.npass=0
19  self.noRun=noRun
20  self.dryRun=dryRun
21  self.cafVeto=cafVeto
22  self.dasOptions=dasOptions
23  self.jobReport=jobReport
24  self.nThreads=nThreads
25  self.maxSteps = maxSteps
26 
27  self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
28  return
29 
def __init__(self, wf, noRun=False, dryRun=False, cafVeto=True, dasOptions="", jobReport=False, nThreads=1, maxSteps=9999)
npass
needs to set self.report
#define str(s)

Member Function Documentation

def WorkFlowRunner.WorkFlowRunner.doCmd (   self,
  cmd 
)

Definition at line 30 of file WorkFlowRunner.py.

References WorkFlowRunner.WorkFlowRunner.dryRun, edm.print(), and WorkFlowRunner.WorkFlowRunner.wfDir.

30  def doCmd(self, cmd):
31 
32  msg = "\n# in: " +os.getcwd()
33  if self.dryRun: msg += " dryRun for '"
34  else: msg += " going to execute "
35  msg += cmd.replace(';','\n')
36  print(msg)
37 
38  cmdLog = open(self.wfDir+'/cmdLog','a')
39  cmdLog.write(msg+'\n')
40  cmdLog.close()
41 
42  ret = 0
43  if not self.dryRun:
44  p = Popen(cmd, shell=True)
45  ret = os.waitpid(p.pid, 0)[1]
46  if ret != 0:
47  print("ERROR executing ",cmd,'ret=', ret)
48 
49  return ret
50 
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def WorkFlowRunner.WorkFlowRunner.run (   self)

Definition at line 51 of file WorkFlowRunner.py.

References WorkFlowRunner.WorkFlowRunner.dryRun, runall.testit.nfail, WorkFlowRunner.WorkFlowRunner.nfail, runall.testit.npass, TShapeAnalysis.npass, WorkFlowRunner.WorkFlowRunner.npass, edm.print(), and WorkFlowRunner.WorkFlowRunner.wfDir.

Referenced by Types.EventID.cppID(), and Types.LuminosityBlockID.cppID().

51  def run(self):
52 
53  startDir = os.getcwd()
54 
55  if not os.path.exists(self.wfDir):
56  os.makedirs(self.wfDir)
57  elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
58  print("cleaning up ", self.wfDir, ' in ', os.getcwd())
59  shutil.rmtree(self.wfDir)
60  os.makedirs(self.wfDir)
61 
62  preamble = 'cd '+self.wfDir+'; '
63 
64  realstarttime = datetime.now()
65  startime='date %s' %time.asctime()
66 
67  # check where we are running:
68  onCAF = False
69  if 'cms/caf/cms' in os.environ['CMS_PATH']:
70  onCAF = True
71 
72  ##needs to set
73  #self.report
74  self.npass = []
75  self.nfail = []
76  self.stat = []
77  self.retStep = []
78 
79  def closeCmd(i,ID):
80  return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
81 
82  inFile=None
83  lumiRangeFile=None
84  aborted=False
85  for (istepmone,com) in enumerate(self.wf.cmds):
86  # isInputOk is used to keep track of the das result. In case this
87  # is False we use a different error message to indicate the failed
88  # das query.
89  isInputOk=True
90  istep=istepmone+1
91  cmd = preamble
92  if aborted:
93  self.npass.append(0)
94  self.nfail.append(0)
95  self.retStep.append(0)
96  self.stat.append('NOTRUN')
97  continue
98  if not isinstance(com,str):
99  if self.cafVeto and (com.location == 'CAF' and not onCAF):
100  print("You need to be no CAF to run",self.wf.numId)
101  self.npass.append(0)
102  self.nfail.append(0)
103  self.retStep.append(0)
104  self.stat.append('NOTRUN')
105  aborted=True
106  continue
107  #create lumiRange file first so if das fails we get its error code
108  cmd2 = com.lumiRanges()
109  if cmd2:
110  cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
111  lumiRangeFile='step%d_lumiRanges.log'%(istep,)
112  retStep = self.doCmd(cmd2)
113  if (com.dataSetParent):
114  cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
115  retStep = self.doCmd(cmd3)
116  cmd+=com.das(self.dasOptions,com.dataSet)
117  cmd+=closeCmd(istep,'dasquery')
118  retStep = self.doCmd(cmd)
119  #don't use the file list executed, but use the das command of cmsDriver for next step
120  # If the das output is not there or it's empty, consider it an
121  # issue of this step, not of the next one.
122  dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
123  # Check created das output in no-dryRun mode only
124  if not self.dryRun:
125  if not exists(dasOutputPath):
126  retStep = 1
127  dasOutput = None
128  else:
129  # We consider only the files which have at least one logical filename
130  # in it. This is because sometimes das fails and still prints out junk.
131  dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
132  if not dasOutput:
133  retStep = 1
134  isInputOk = False
135 
136  inFile = 'filelist:' + basename(dasOutputPath)
137  print("---")
138  else:
139  #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
140  cmd += com
141  if self.noRun:
142  cmd +=' --no_exec'
143  # in case previous step used DAS query (either filelist of das:)
144  # not to be applied for premixing stage1 to allow combiend stage1+stage2 workflow
145  if inFile and not 'premix_stage1' in cmd:
146  cmd += ' --filein '+inFile
147  inFile=None
148  if lumiRangeFile: #DAS query can also restrict lumi range
149  cmd += ' --lumiToProcess '+lumiRangeFile
150  lumiRangeFile=None
151  # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..
152  if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
153  cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
154  else:
155  # Disable input for premix stage1 to allow combined stage1+stage2 workflow
156  # Disable input for premix stage2 in FastSim to allow combined stage1+stage2 workflow (in FS, stage2 does also GEN)
157  # Ugly hack but works
158  if istep!=1 and not '--filein' in cmd and not 'premix_stage1' in cmd and not ("--fast" in cmd and "premix_stage2" in cmd):
159  cmd+=' --filein file:step%s.root '%(istep-1,)
160  if not '--fileout' in com:
161  cmd+=' --fileout file:step%s.root '%(istep,)
162  if self.jobReport:
163  cmd += ' --suffix "-j JobReport%s.xml " ' % istep
164  if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
165  cmd += ' --nThreads %s' % self.nThreads
166  cmd+=closeCmd(istep,self.wf.nameId)
167  retStep = 0
168  if istep>self.maxSteps:
169  wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
170  wf_stats.write('step%s:%s\n' % (istep, cmd))
171  wf_stats.close()
172  else: retStep = self.doCmd(cmd)
173 
174  self.retStep.append(retStep)
175  if retStep == 32000:
176  # A timeout occurred
177  self.npass.append(0)
178  self.nfail.append(1)
179  self.stat.append('TIMEOUT')
180  aborted = True
181  elif (retStep!=0):
182  #error occured
183  self.npass.append(0)
184  self.nfail.append(1)
185  if not isInputOk:
186  self.stat.append("DAS_ERROR")
187  else:
188  self.stat.append('FAILED')
189  #to skip processing
190  aborted=True
191  else:
192  #things went fine
193  self.npass.append(1)
194  self.nfail.append(0)
195  self.stat.append('PASSED')
196 
197  os.chdir(startDir)
198  endtime='date %s' %time.asctime()
199  tottime='%s-%s'%(endtime,startime)
200 
201 
202  #### wrap up ####
203 
204  logStat=''
205  for i,s in enumerate(self.stat):
206  logStat+='Step%d-%s '%(i,s)
207  self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
208 
209  return
210 
211 
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
npass
needs to set self.report
double split
Definition: MVATrainer.cc:139

Member Data Documentation

WorkFlowRunner.WorkFlowRunner.cafVeto

Definition at line 21 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.dasOptions

Definition at line 22 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.dryRun
WorkFlowRunner.WorkFlowRunner.jobReport

Definition at line 23 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.maxSteps

Definition at line 25 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.nfail

Definition at line 17 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.noRun

Definition at line 19 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.npass

needs to set self.report

Definition at line 18 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.nThreads

Definition at line 24 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.report

wrap up ####

Definition at line 16 of file WorkFlowRunner.py.

Referenced by dataset.Dataset.getPrimaryDatasetEntries(), and addOnTests.testit.run().

WorkFlowRunner.WorkFlowRunner.retStep

Definition at line 77 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.stat

Definition at line 76 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.status
WorkFlowRunner.WorkFlowRunner.wf

Definition at line 13 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.wfDir