CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
List of all members | Public Member Functions | Public Attributes
WorkFlowRunner.WorkFlowRunner Class Reference
Inheritance diagram for WorkFlowRunner.WorkFlowRunner:

Public Member Functions

def __init__
 
def doCmd
 
def run
 

Public Attributes

 cafVeto
 
 dasOptions
 
 dryRun
 
 jobReport
 
 maxSteps
 
 nfail
 
 noRun
 
 npass
 needs to set self.report More...
 
 nStreams
 
 nThreads
 
 report
 wrap up #### More...
 
 retStep
 
 stat
 
 status
 
 wf
 
 wfDir
 

Detailed Description

Definition at line 10 of file WorkFlowRunner.py.

Constructor & Destructor Documentation

def WorkFlowRunner.WorkFlowRunner.__init__ (   self,
  wf,
  noRun = False,
  dryRun = False,
  cafVeto = True,
  dasOptions = "",
  jobReport = False,
  nThreads = 1,
  nStreams = 0,
  maxSteps = 9999 
)

Definition at line 11 of file WorkFlowRunner.py.

11 
12  def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, nStreams=0, maxSteps=9999):
13  Thread.__init__(self)
14  self.wf = wf
15 
16  self.status=-1
17  self.report=''
18  self.nfail=0
19  self.npass=0
20  self.noRun=noRun
21  self.dryRun=dryRun
22  self.cafVeto=cafVeto
23  self.dasOptions=dasOptions
24  self.jobReport=jobReport
25  self.nThreads=nThreads
26  self.nStreams=nStreams
27  self.maxSteps=maxSteps
28 
29  self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
30  return
npass
needs to set self.report
#define str(s)

Member Function Documentation

def WorkFlowRunner.WorkFlowRunner.doCmd (   self,
  cmd 
)

Definition at line 31 of file WorkFlowRunner.py.

References WorkFlowRunner.WorkFlowRunner.dryRun, print(), and WorkFlowRunner.WorkFlowRunner.wfDir.

31 
32  def doCmd(self, cmd):
33 
34  msg = "\n# in: " +os.getcwd()
35  if self.dryRun: msg += " dryRun for '"
36  else: msg += " going to execute "
37  msg += cmd.replace(';','\n')
38  print(msg)
39 
40  cmdLog = open(self.wfDir+'/cmdLog','a')
41  cmdLog.write(msg+'\n')
42  cmdLog.close()
43 
44  ret = 0
45  if not self.dryRun:
46  p = Popen(cmd, shell=True)
47  ret = os.waitpid(p.pid, 0)[1]
48  if ret != 0:
49  print("ERROR executing ",cmd,'ret=', ret)
50 
51  return ret
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def WorkFlowRunner.WorkFlowRunner.run (   self)

Definition at line 52 of file WorkFlowRunner.py.

References WorkFlowRunner.WorkFlowRunner.dryRun, runall.testit.nfail, WorkFlowRunner.WorkFlowRunner.nfail, TShapeAnalysis.npass, runall.testit.npass, WorkFlowRunner.WorkFlowRunner.npass, print(), and WorkFlowRunner.WorkFlowRunner.wfDir.

Referenced by Types.EventID.cppID(), and Types.LuminosityBlockID.cppID().

52 
53  def run(self):
54 
55  startDir = os.getcwd()
56 
57  if not os.path.exists(self.wfDir):
58  os.makedirs(self.wfDir)
59  elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
60  print("cleaning up ", self.wfDir, ' in ', os.getcwd())
61  shutil.rmtree(self.wfDir)
62  os.makedirs(self.wfDir)
63 
64  preamble = 'cd '+self.wfDir+'; '
65 
66  realstarttime = datetime.now()
67  startime='date %s' %time.asctime()
68 
69  # check where we are running:
70  onCAF = False
71  if 'cms/caf/cms' in os.environ['CMS_PATH']:
72  onCAF = True
73 
74  ##needs to set
75  #self.report
76  self.npass = []
77  self.nfail = []
78  self.stat = []
79  self.retStep = []
80 
81  def closeCmd(i,ID):
82  return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
83 
84  inFile=None
85  lumiRangeFile=None
86  aborted=False
87  for (istepmone,com) in enumerate(self.wf.cmds):
88  # isInputOk is used to keep track of the das result. In case this
89  # is False we use a different error message to indicate the failed
90  # das query.
91  isInputOk=True
92  istep=istepmone+1
93  cmd = preamble
94  if aborted:
95  self.npass.append(0)
96  self.nfail.append(0)
97  self.retStep.append(0)
98  self.stat.append('NOTRUN')
99  continue
100  if not isinstance(com,str):
101  if self.cafVeto and (com.location == 'CAF' and not onCAF):
102  print("You need to be no CAF to run",self.wf.numId)
103  self.npass.append(0)
104  self.nfail.append(0)
105  self.retStep.append(0)
106  self.stat.append('NOTRUN')
107  aborted=True
108  continue
109  #create lumiRange file first so if das fails we get its error code
110  cmd2 = com.lumiRanges()
111  if cmd2:
112  cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
113  lumiRangeFile='step%d_lumiRanges.log'%(istep,)
114  retStep = self.doCmd(cmd2)
115  if (com.dataSetParent):
116  cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
117  retStep = self.doCmd(cmd3)
118  cmd+=com.das(self.dasOptions,com.dataSet)
119  cmd+=closeCmd(istep,'dasquery')
120  retStep = self.doCmd(cmd)
121  #don't use the file list executed, but use the das command of cmsDriver for next step
122  # If the das output is not there or it's empty, consider it an
123  # issue of this step, not of the next one.
124  dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
125  # Check created das output in no-dryRun mode only
126  if not self.dryRun:
127  if not exists(dasOutputPath):
128  retStep = 1
129  dasOutput = None
130  else:
131  # We consider only the files which have at least one logical filename
132  # in it. This is because sometimes das fails and still prints out junk.
133  dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
134  if not dasOutput:
135  retStep = 1
136  isInputOk = False
137 
138  inFile = 'filelist:' + basename(dasOutputPath)
139  print("---")
140  else:
141  #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
142  cmd += com
143  if self.noRun:
144  cmd +=' --no_exec'
145  # in case previous step used DAS query (either filelist of das:)
146  # not to be applied for premixing stage1 to allow combiend stage1+stage2 workflow
147  if inFile and not 'premix_stage1' in cmd:
148  cmd += ' --filein '+inFile
149  inFile=None
150  if lumiRangeFile: #DAS query can also restrict lumi range
151  cmd += ' --lumiToProcess '+lumiRangeFile
152  lumiRangeFile=None
153  # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..
154  if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
155  cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
156  else:
157  # Disable input for premix stage1 to allow combined stage1+stage2 workflow
158  # Disable input for premix stage2 in FastSim to allow combined stage1+stage2 workflow (in FS, stage2 does also GEN)
159  # Ugly hack but works
160  if istep!=1 and not '--filein' in cmd and not 'premix_stage1' in cmd and not ("--fast" in cmd and "premix_stage2" in cmd):
161  cmd+=' --filein file:step%s.root '%(istep-1,)
162  if not '--fileout' in com:
163  cmd+=' --fileout file:step%s.root '%(istep,)
164  if self.jobReport:
165  cmd += ' --suffix "-j JobReport%s.xml " ' % istep
166  if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
167  cmd += ' --nThreads %s' % self.nThreads
168  if (self.nStreams > 0) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
169  cmd += ' --nStreams %s' % self.nStreams
170  cmd+=closeCmd(istep,self.wf.nameId)
171  retStep = 0
172  if istep>self.maxSteps:
173  wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
174  wf_stats.write('step%s:%s\n' % (istep, cmd))
175  wf_stats.close()
176  else: retStep = self.doCmd(cmd)
177 
178  self.retStep.append(retStep)
179  if retStep == 32000:
180  # A timeout occurred
181  self.npass.append(0)
182  self.nfail.append(1)
183  self.stat.append('TIMEOUT')
184  aborted = True
185  elif (retStep!=0):
186  #error occured
187  self.npass.append(0)
188  self.nfail.append(1)
189  if not isInputOk:
190  self.stat.append("DAS_ERROR")
191  else:
192  self.stat.append('FAILED')
193  #to skip processing
194  aborted=True
195  else:
196  #things went fine
197  self.npass.append(1)
198  self.nfail.append(0)
199  self.stat.append('PASSED')
200 
201  os.chdir(startDir)
202  endtime='date %s' %time.asctime()
203  tottime='%s-%s'%(endtime,startime)
204 
205 
206  #### wrap up ####
207 
208  logStat=''
209  for i,s in enumerate(self.stat):
210  logStat+='Step%d-%s '%(i,s)
211  self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
212 
213  return
214 
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
npass
needs to set self.report

Member Data Documentation

WorkFlowRunner.WorkFlowRunner.cafVeto

Definition at line 21 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.dasOptions

Definition at line 22 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.dryRun

Definition at line 20 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.doCmd(), and WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.jobReport

Definition at line 23 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.maxSteps

Definition at line 26 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.nfail

Definition at line 17 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.noRun

Definition at line 19 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.npass

needs to set self.report

Definition at line 18 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.run().

WorkFlowRunner.WorkFlowRunner.nStreams

Definition at line 25 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.nThreads

Definition at line 24 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.report

wrap up ####

Definition at line 16 of file WorkFlowRunner.py.

Referenced by dataset.Dataset.getPrimaryDatasetEntries(), and addOnTests.testit.run().

WorkFlowRunner.WorkFlowRunner.retStep

Definition at line 78 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.stat

Definition at line 77 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.status

Definition at line 15 of file WorkFlowRunner.py.

Referenced by dirstructure.Comparison.__make_image(), and dirstructure.Comparison.__repr__().

WorkFlowRunner.WorkFlowRunner.wf

Definition at line 13 of file WorkFlowRunner.py.

WorkFlowRunner.WorkFlowRunner.wfDir

Definition at line 28 of file WorkFlowRunner.py.

Referenced by WorkFlowRunner.WorkFlowRunner.doCmd(), and WorkFlowRunner.WorkFlowRunner.run().