CMS 3D CMS Logo

List of all members | Public Member Functions | Public Attributes
WorkFlowRunner.WorkFlowRunner Class Reference
Inheritance diagram for WorkFlowRunner.WorkFlowRunner:

Public Member Functions

def __init__ (self, wf, noRun=False, dryRun=False, cafVeto=True, dasOptions="", jobReport=False, nThreads=1, maxSteps=9999)
 
def doCmd (self, cmd)
 
def run (self)
 

Public Attributes

 cafVeto
 
 dasOptions
 
 dryRun
 
 jobReport
 
 maxSteps
 
 nfail
 
 noRun
 
 npass
 needs to set self.report More...
 
 nThreads
 
 report
 wrap up #### More...
 
 retStep
 
 stat
 
 status
 
 wf
 
 wfDir
 

Detailed Description

Definition at line 10 of file WorkFlowRunner.py.

Constructor & Destructor Documentation

◆ __init__()

def WorkFlowRunner.WorkFlowRunner.__init__ (   self,
  wf,
  noRun = False,
  dryRun = False,
  cafVeto = True,
  dasOptions = "",
  jobReport = False,
  nThreads = 1,
  maxSteps = 9999 
)

Definition at line 11 of file WorkFlowRunner.py.

11  def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, maxSteps=9999):
12  Thread.__init__(self)
13  self.wf = wf
14 
15  self.status=-1
16  self.report=''
17  self.nfail=0
18  self.npass=0
19  self.noRun=noRun
20  self.dryRun=dryRun
21  self.cafVeto=cafVeto
22  self.dasOptions=dasOptions
23  self.jobReport=jobReport
24  self.nThreads=nThreads
25  self.maxSteps = maxSteps
26 
27  self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
28  return
29 

Member Function Documentation

◆ doCmd()

def WorkFlowRunner.WorkFlowRunner.doCmd (   self,
  cmd 
)

Definition at line 30 of file WorkFlowRunner.py.

30  def doCmd(self, cmd):
31 
32  msg = "\n# in: " +os.getcwd()
33  if self.dryRun: msg += " dryRun for '"
34  else: msg += " going to execute "
35  msg += cmd.replace(';','\n')
36  print(msg)
37 
38  cmdLog = open(self.wfDir+'/cmdLog','a')
39  cmdLog.write(msg+'\n')
40  cmdLog.close()
41 
42  ret = 0
43  if not self.dryRun:
44  p = Popen(cmd, shell=True)
45  ret = os.waitpid(p.pid, 0)[1]
46  if ret != 0:
47  print("ERROR executing ",cmd,'ret=', ret)
48 
49  return ret
50 

References WorkFlowRunner.WorkFlowRunner.dryRun, edm.print(), and WorkFlowRunner.WorkFlowRunner.wfDir.

◆ run()

def WorkFlowRunner.WorkFlowRunner.run (   self)

Definition at line 51 of file WorkFlowRunner.py.

51  def run(self):
52 
53  startDir = os.getcwd()
54 
55  if not os.path.exists(self.wfDir):
56  os.makedirs(self.wfDir)
57  elif not self.dryRun: # clean up to allow re-running in the same overall devel area, then recreate the dir to make sure it exists
58  print("cleaning up ", self.wfDir, ' in ', os.getcwd())
59  shutil.rmtree(self.wfDir)
60  os.makedirs(self.wfDir)
61 
62  preamble = 'cd '+self.wfDir+'; '
63 
64  realstarttime = datetime.now()
65  startime='date %s' %time.asctime()
66 
67  # check where we are running:
68  onCAF = False
69  if 'cms/caf/cms' in os.environ['CMS_PATH']:
70  onCAF = True
71 
72 
74  self.npass = []
75  self.nfail = []
76  self.stat = []
77  self.retStep = []
78 
79  def closeCmd(i,ID):
80  return ' > %s 2>&1; ' % ('step%d_'%(i,)+ID+'.log ',)
81 
82  inFile=None
83  lumiRangeFile=None
84  aborted=False
85  for (istepmone,com) in enumerate(self.wf.cmds):
86  # isInputOk is used to keep track of the das result. In case this
87  # is False we use a different error message to indicate the failed
88  # das query.
89  isInputOk=True
90  istep=istepmone+1
91  cmd = preamble
92  if aborted:
93  self.npass.append(0)
94  self.nfail.append(0)
95  self.retStep.append(0)
96  self.stat.append('NOTRUN')
97  continue
98  if not isinstance(com,str):
99  if self.cafVeto and (com.location == 'CAF' and not onCAF):
100  print("You need to be no CAF to run",self.wf.numId)
101  self.npass.append(0)
102  self.nfail.append(0)
103  self.retStep.append(0)
104  self.stat.append('NOTRUN')
105  aborted=True
106  continue
107  #create lumiRange file first so if das fails we get its error code
108  cmd2 = com.lumiRanges()
109  if cmd2:
110  cmd2 =cmd+cmd2+closeCmd(istep,'lumiRanges')
111  lumiRangeFile='step%d_lumiRanges.log'%(istep,)
112  retStep = self.doCmd(cmd2)
113  if (com.dataSetParent):
114  cmd3=cmd+com.das(self.dasOptions,com.dataSetParent)+closeCmd(istep,'dasparentquery')
115  retStep = self.doCmd(cmd3)
116  cmd+=com.das(self.dasOptions,com.dataSet)
117  cmd+=closeCmd(istep,'dasquery')
118  retStep = self.doCmd(cmd)
119  #don't use the file list executed, but use the das command of cmsDriver for next step
120  # If the das output is not there or it's empty, consider it an
121  # issue of this step, not of the next one.
122  dasOutputPath = join(self.wfDir, 'step%d_dasquery.log'%(istep,))
123  # Check created das output in no-dryRun mode only
124  if not self.dryRun:
125  if not exists(dasOutputPath):
126  retStep = 1
127  dasOutput = None
128  else:
129  # We consider only the files which have at least one logical filename
130  # in it. This is because sometimes das fails and still prints out junk.
131  dasOutput = [l for l in open(dasOutputPath).read().split("\n") if l.startswith("/")]
132  if not dasOutput:
133  retStep = 1
134  isInputOk = False
135 
136  inFile = 'filelist:' + basename(dasOutputPath)
137  print("---")
138  else:
139  #chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
140  cmd += com
141  if self.noRun:
142  cmd +=' --no_exec'
143  # in case previous step used DAS query (either filelist of das:)
144  # not to be applied for premixing stage1 to allow combiend stage1+stage2 workflow
145  if inFile and not 'premix_stage1' in cmd:
146  cmd += ' --filein '+inFile
147  inFile=None
148  if lumiRangeFile: #DAS query can also restrict lumi range
149  cmd += ' --lumiToProcess '+lumiRangeFile
150  lumiRangeFile=None
151  # 134 is an existing workflow where harvesting has to operate on AlcaReco and NOT on DQM; hard-coded..
152  if 'HARVESTING' in cmd and not 134==self.wf.numId and not '--filein' in cmd:
153  cmd+=' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
154  else:
155  # Disable input for premix stage1 to allow combined stage1+stage2 workflow
156  # Disable input for premix stage2 in FastSim to allow combined stage1+stage2 workflow (in FS, stage2 does also GEN)
157  # Ugly hack but works
158  if istep!=1 and not '--filein' in cmd and not 'premix_stage1' in cmd and not ("--fast" in cmd and "premix_stage2" in cmd):
159  cmd+=' --filein file:step%s.root '%(istep-1,)
160  if not '--fileout' in com:
161  cmd+=' --fileout file:step%s.root '%(istep,)
162  if self.jobReport:
163  cmd += ' --suffix "-j JobReport%s.xml " ' % istep
164  if (self.nThreads > 1) and ('HARVESTING' not in cmd) and ('ALCAHARVEST' not in cmd):
165  cmd += ' --nThreads %s' % self.nThreads
166  cmd+=closeCmd(istep,self.wf.nameId)
167  retStep = 0
168  if istep>self.maxSteps:
169  wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
170  wf_stats.write('step%s:%s\n' % (istep, cmd))
171  wf_stats.close()
172  else: retStep = self.doCmd(cmd)
173 
174  self.retStep.append(retStep)
175  if retStep == 32000:
176  # A timeout occurred
177  self.npass.append(0)
178  self.nfail.append(1)
179  self.stat.append('TIMEOUT')
180  aborted = True
181  elif (retStep!=0):
182  #error occured
183  self.npass.append(0)
184  self.nfail.append(1)
185  if not isInputOk:
186  self.stat.append("DAS_ERROR")
187  else:
188  self.stat.append('FAILED')
189  #to skip processing
190  aborted=True
191  else:
192  #things went fine
193  self.npass.append(1)
194  self.nfail.append(0)
195  self.stat.append('PASSED')
196 
197  os.chdir(startDir)
198  endtime='date %s' %time.asctime()
199  tottime='%s-%s'%(endtime,startime)
200 
201 
202 
203 
204  logStat=''
205  for i,s in enumerate(self.stat):
206  logStat+='Step%d-%s '%(i,s)
207  self.report='%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+' '.join(map(str,self.retStep))+'\n'
208 
209  return
210 

References WorkFlowRunner.WorkFlowRunner.dryRun, runall.testit.nfail, WorkFlowRunner.WorkFlowRunner.nfail, TShapeAnalysis.npass, runall.testit.npass, WorkFlowRunner.WorkFlowRunner.npass, edm.print(), and WorkFlowRunner.WorkFlowRunner.wfDir.

Referenced by Types.EventID.cppID(), and Types.LuminosityBlockID.cppID().

Member Data Documentation

◆ cafVeto

WorkFlowRunner.WorkFlowRunner.cafVeto

Definition at line 21 of file WorkFlowRunner.py.

◆ dasOptions

WorkFlowRunner.WorkFlowRunner.dasOptions

Definition at line 22 of file WorkFlowRunner.py.

◆ dryRun

WorkFlowRunner.WorkFlowRunner.dryRun

◆ jobReport

WorkFlowRunner.WorkFlowRunner.jobReport

Definition at line 23 of file WorkFlowRunner.py.

◆ maxSteps

WorkFlowRunner.WorkFlowRunner.maxSteps

Definition at line 25 of file WorkFlowRunner.py.

◆ nfail

WorkFlowRunner.WorkFlowRunner.nfail

Definition at line 17 of file WorkFlowRunner.py.

Referenced by addOnTests.testit.run(), and WorkFlowRunner.WorkFlowRunner.run().

◆ noRun

WorkFlowRunner.WorkFlowRunner.noRun

Definition at line 19 of file WorkFlowRunner.py.

◆ npass

WorkFlowRunner.WorkFlowRunner.npass

needs to set self.report

Definition at line 18 of file WorkFlowRunner.py.

Referenced by addOnTests.testit.run(), and WorkFlowRunner.WorkFlowRunner.run().

◆ nThreads

WorkFlowRunner.WorkFlowRunner.nThreads

Definition at line 24 of file WorkFlowRunner.py.

◆ report

WorkFlowRunner.WorkFlowRunner.report

wrap up ####

Definition at line 16 of file WorkFlowRunner.py.

Referenced by dataset.Dataset.getPrimaryDatasetEntries(), and addOnTests.testit.run().

◆ retStep

WorkFlowRunner.WorkFlowRunner.retStep

Definition at line 77 of file WorkFlowRunner.py.

◆ stat

WorkFlowRunner.WorkFlowRunner.stat

Definition at line 76 of file WorkFlowRunner.py.

◆ status

WorkFlowRunner.WorkFlowRunner.status

◆ wf

WorkFlowRunner.WorkFlowRunner.wf

Definition at line 13 of file WorkFlowRunner.py.

◆ wfDir

WorkFlowRunner.WorkFlowRunner.wfDir
join
static std::string join(char **cmd)
Definition: RemoteFile.cc:17
cms::dd::split
std::vector< std::string_view > split(std::string_view, const char *)
str
#define str(s)
Definition: TestProcessor.cc:48
mps_setup.append
append
Definition: mps_setup.py:85
edm::print
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
readEcalDQMStatus.read
read
Definition: readEcalDQMStatus.py:38
writedatasetfile.run
run
Definition: writedatasetfile.py:27
genParticles_cff.map
map
Definition: genParticles_cff.py:11