CMS 3D CMS Logo

List of all members | Public Member Functions | Public Attributes
WorkFlowRunner.WorkFlowRunner Class Reference
Inheritance diagram for WorkFlowRunner.WorkFlowRunner:

Public Member Functions

def __init__ (self, wf, noRun=False, dryRun=False, cafVeto=True, dasOptions="", jobReport=False, nThreads=1, maxSteps=9999)
 
def doCmd (self, cmd)
 
def run (self)
 

Public Attributes

 cafVeto
 
 dasOptions
 
 dryRun
 
 jobReport
 
 maxSteps
 
 nfail
 
 noRun
 
 npass
 needs to set self.report More...
 
 nThreads
 
 report
 wrap up #### More...
 
 retStep
 
 stat
 
 status
 
 wf
 
 wfDir
 

Detailed Description

Definition at line 10 of file WorkFlowRunner.py.

Constructor & Destructor Documentation

def WorkFlowRunner.WorkFlowRunner.__init__ (   self,
  wf,
  noRun = False,
  dryRun = False,
  cafVeto = True,
  dasOptions = "",
  jobReport = False,
  nThreads = 1,
  maxSteps = 9999 
)

Definition at line 11 of file WorkFlowRunner.py.

11  def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, maxSteps=9999):
12  Thread.__init__(self)
13  self.wf = wf
14 
15  self.status=-1
16  self.report=''
17  self.nfail=0
18  self.npass=0
19  self.noRun=noRun
20  self.dryRun=dryRun
21  self.cafVeto=cafVeto
22  self.dasOptions=dasOptions
23  self.jobReport=jobReport
24  self.nThreads=nThreads
25  self.maxSteps = maxSteps
26 
27  self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
28  return
29 
def __init__(self, wf, noRun=False, dryRun=False, cafVeto=True, dasOptions="", jobReport=False, nThreads=1, maxSteps=9999)
npass
needs to set self.report
#define str(s)

Member Function Documentation

def WorkFlowRunner.WorkFlowRunner.doCmd (   self,
  cmd 
)

Definition at line 30 of file WorkFlowRunner.py.

References WorkFlowRunner.WorkFlowRunner.dryRun, edm.print(), and WorkFlowRunner.WorkFlowRunner.wfDir.

30  def doCmd(self, cmd):
31 
32  msg = "\n# in: " +os.getcwd()
33  if self.dryRun: msg += " dryRun for '"
34  else: msg += " going to execute "
35  msg += cmd.replace(';','\n')
36  print(msg)
37 
38  cmdLog = open(self.wfDir+'/cmdLog','a')
39  cmdLog.write(msg+'\n')
40  cmdLog.close()
41 
42  ret = 0
43  if not self.dryRun:
44  p = Popen(cmd, shell=True)
45  ret = os.waitpid(p.pid, 0)[1]
46  if ret != 0:
47  print("ERROR executing ",cmd,'ret=', ret)
48 
49  return ret
50 
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def WorkFlowRunner.WorkFlowRunner.run (   self)

Definition at line 51 of file WorkFlowRunner.py.

References WorkFlowRunner.WorkFlowRunner.dryRun, runall.testit.nfail, WorkFlowRunner.WorkFlowRunner.nfail, runall.testit.npass, TShapeAnalysis.npass, WorkFlowRunner.WorkFlowRunner.npass, edm.print(), and WorkFlowRunner.WorkFlowRunner.wfDir.

Referenced by Types.EventID.cppID(), and Types.LuminosityBlockID.cppID().

51  def run(self):
52 
53  startDir = os.getcwd()
54 
55  if not os.path.exists(self.wfDir):
56  os.makedirs(self.wfDir)
57  elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
58  print("cleaning up ", self.wfDir, ' in ', os.getcwd())
59  shutil.rmtree(self.wfDir)
60  os.makedirs(self.wfDir)
61 
62  preamble = 'cd '+self.wfDir+'; '
63 
64  realstarttime = datetime.now()
65  startime='date %s' %time.asctime()
66 
67  # check where we are running:
68  onCAF = False
69  if 'cms/caf/cms' in os.environ['CMS_PATH']:
70  onCAF = True
71 
72  ##needs to set
73  #self.report
74  self.npass = []
75  self.nfail = []
76  self.stat = []
77  self.retStep = []
78 
79  def closeCmd(i,ID):
80  return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
81 
82  inFile=None
83  lumiRangeFile=None
84  aborted=False
85  for (istepmone,com) in enumerate(self.wf.cmds):
86  # isInputOk is used to keep track of the das result. In case this
87  # is False we use a different error message to indicate the failed
88  # das query.
89  isInputOk=True
90  istep=istepmone+1
91  cmd = preamble
92  if aborted:
93  self.npass.append(0)
94  self.nfail.append(0)
95  self.retStep.append(0)
96  self.stat.append('NOTRUN')
97  continue
98  if not isinstance(com,str):
99  if self.cafVeto and (com.location == 'CAF' and not onCAF):
100  print("You need to be no CAF to run",self.wf.numId)
101  self.npass.append(0)
102  self.nfail.append(0)
103  self.retStep.append(0)
104  self.stat.append('NOTRUN')
105  aborted=True
106  continue
107  #create lumiRange file first so if das fails we get its error code
108  cmd2 = com.lumiRanges()
109  if cmd2:
110  cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
111  lumiRangeFile='step%d_lumiRanges.log'%(istep,)
112  retStep = self.doCmd(cmd2)
113  if (com.dataSetParent):
114  cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
115  retStep = self.doCmd(cmd3)
116  cmd+=com.das(self.dasOptions,com.dataSet)
117  cmd+=closeCmd(istep,'dasquery')
118  retStep = self.doCmd(cmd)
119  #don't use the file list executed, but use the das command of cmsDriver for next step
120  # If the das output is not there or it's empty, consider it an
121  # issue of this step, not of the next one.
122  dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
123  if not exists(dasOutputPath):
124  retStep = 1
125  dasOutput = None
126  else:
127  # We consider only the files which have at least one logical filename
128  # in it. This is because sometimes das fails and still prints out junk.
129  dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
130  if not dasOutput:
131  retStep = 1
132  isInputOk = False
133 
134  inFile = 'filelist:' + basename(dasOutputPath)
135  print("---")
136  else:
137  #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
138  cmd += com
139  if self.noRun:
140  cmd +=' --no_exec'
141  # in case previous step used DAS query (either filelist of das:)
142  # not to be applied for premixing stage1 to allow combiend stage1+stage2 workflow
143  if inFile and not 'premix_stage1' in cmd:
144  cmd += ' --filein '+inFile
145  inFile=None
146  if lumiRangeFile: #DAS query can also restrict lumi range
147  cmd += ' --lumiToProcess '+lumiRangeFile
148  lumiRangeFile=None
149  # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..
150  if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
151  cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
152  else:
153  # Disable input for premix stage1 to allow combined stage1+stage2 workflow
154  # Disable input for premix stage2 in FastSim to allow combined stage1+stage2 workflow (in FS, stage2 does also GEN)
155  # Ugly hack but works
156  if istep!=1 and not '--filein' in cmd and not 'premix_stage1' in cmd and not ("--fast" in cmd and "premix_stage2" in cmd):
157  cmd+=' --filein file:step%s.root '%(istep-1,)
158  if not '--fileout' in com:
159  cmd+=' --fileout file:step%s.root '%(istep,)
160  if self.jobReport:
161  cmd += ' --suffix "-j JobReport%s.xml " ' % istep
162  if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
163  cmd += ' --nThreads %s' % self.nThreads
164  cmd+=closeCmd(istep,self.wf.nameId)
165  retStep = 0
166  if istep>self.maxSteps:
167  wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
168  wf_stats.write('step%s:%s\n' % (istep, cmd))
169  wf_stats.close()
170  else: retStep = self.doCmd(cmd)
171 
172  self.retStep.append(retStep)
173  if retStep == 32000:
174  # A timeout occurred
175  self.npass.append(0)
176  self.nfail.append(1)
177  self.stat.append('TIMEOUT')
178  aborted = True
179  elif (retStep!=0):
180  #error occured
181  self.npass.append(0)
182  self.nfail.append(1)
183  if not isInputOk:
184  self.stat.append("DAS_ERROR")
185  else:
186  self.stat.append('FAILED')
187  #to skip processing
188  aborted=True
189  else:
190  #things went fine
191  self.npass.append(1)
192  self.nfail.append(0)
193  self.stat.append('PASSED')
194 
195  os.chdir(startDir)
196  endtime='date %s' %time.asctime()
197  tottime='%s-%s'%(endtime,startime)
198 
199 
200  #### wrap up ####
201 
202  logStat=''
203  for i,s in enumerate(self.stat):
204  logStat+='Step%d-%s '%(i,s)
205  self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
206 
207  return
208 
209 
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
npass
needs to set self.report
double split
Definition: MVATrainer.cc:139

Member Data Documentation

WorkFlowRunner.WorkFlowRunner.cafVeto

Definition at line 21 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.dasOptions

Definition at line 22 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.dryRun
WorkFlowRunner.WorkFlowRunner.jobReport

Definition at line 23 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.maxSteps

Definition at line 25 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.nfail

Definition at line 17 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.noRun

Definition at line 19 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.npass

needs to set self.report

Definition at line 18 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.nThreads

Definition at line 24 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.report

wrap up ####

Definition at line 16 of file WorkFlowRunner.py.

Referenced by dataset.Dataset.getPrimaryDatasetEntries(), and addOnTests.testit.run().

WorkFlowRunner.WorkFlowRunner.retStep

Definition at line 77 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.stat

Definition at line 76 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.status
WorkFlowRunner.WorkFlowRunner.wf

Definition at line 13 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.wfDir