1 from __future__
import print_function
2 from threading
import Thread
3 from Configuration.PyReleaseValidation
import WorkFlow
6 from subprocess
import Popen
7 from os.path
import exists, basename, join
8 from datetime
import datetime
11 def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, nStreams=0, maxSteps=9999, nEvents=0):
35 msg =
"\n# in: " +os.getcwd()
36 if self.
dryRun: msg +=
" dryRun for '" 37 else: msg +=
" going to execute " 38 msg += cmd.replace(
';',
'\n')
41 cmdLog = open(self.
wfDir+
'/cmdLog',
'a')
42 cmdLog.write(msg+
'\n')
47 p = Popen(cmd, shell=
True)
48 ret = os.waitpid(p.pid, 0)[1]
50 print(
"ERROR executing ",cmd,
'ret=', ret)
56 startDir = os.getcwd()
58 if not os.path.exists(self.
wfDir):
59 os.makedirs(self.
wfDir)
61 print(
"cleaning up ", self.
wfDir,
' in ', os.getcwd())
62 shutil.rmtree(self.
wfDir)
63 os.makedirs(self.
wfDir)
65 preamble =
'cd '+self.
wfDir+
'; ' 67 realstarttime = datetime.now()
68 startime=
'date %s' %time.asctime()
72 if 'cms/caf/cms' in os.environ[
'CMS_PATH']:
83 return ' > %s 2>&1; ' % (
'step%d_'%(i,)+ID+
'.log ',)
88 for (istepmone,com)
in enumerate(self.
wf.cmds):
101 if not isinstance(com,str):
102 if self.
cafVeto and (com.location ==
'CAF' and not onCAF):
103 print(
"You need to be no CAF to run",self.
wf.numId)
111 cmd2 = com.lumiRanges()
113 cmd2 =cmd+cmd2+closeCmd(istep,
'lumiRanges')
114 lumiRangeFile=
'step%d_lumiRanges.log'%(istep,)
115 retStep = self.
doCmd(cmd2)
116 if (com.dataSetParent):
117 cmd3=cmd+com.das(self.
dasOptions,com.dataSetParent)+closeCmd(istep,
'dasparentquery')
118 retStep = self.
doCmd(cmd3)
120 cmd+=closeCmd(istep,
'dasquery')
121 retStep = self.
doCmd(cmd)
125 dasOutputPath =
join(self.
wfDir,
'step%d_dasquery.log'%(istep,))
128 if not exists(dasOutputPath):
134 dasOutput = [l
for l
in open(dasOutputPath).
read().
split(
"\n")
if l.startswith(
"/")]
139 inFile =
'filelist:' + basename(dasOutputPath)
148 if inFile
and not 'premix_stage1' in cmd:
149 cmd +=
' --filein '+inFile
152 cmd +=
' --lumiToProcess '+lumiRangeFile
155 if 'HARVESTING' in cmd
and not 134==self.
wf.numId
and not '--filein' in cmd:
156 cmd+=
' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
161 if istep!=1
and not '--filein' in cmd
and not 'premix_stage1' in cmd
and not (
"--fast" in cmd
and "premix_stage2" in cmd):
162 steps = cmd.split(
"-s ")[1].
split(
" ")[0]
163 if "ALCA" not in steps:
164 cmd+=
' --filein file:step%s.root '%(istep-1,)
165 elif "ALCA" in steps
and "RECO" in steps:
166 cmd+=
' --filein file:step%s.root '%(istep-1,)
170 cmd+=
' --filein file:step%s.root '%(istep-1,)
171 if not '--fileout' in com:
172 cmd+=
' --fileout file:step%s.root '%(istep,)
176 cmd +=
' --suffix "-j JobReport%s.xml " ' % istep
177 if (self.
nThreads > 1)
and (
'HARVESTING' not in cmd)
and (
'ALCAHARVEST' not in cmd):
178 cmd +=
' --nThreads %s' % self.
nThreads 179 if (self.
nStreams > 0)
and (
'HARVESTING' not in cmd)
and (
'ALCAHARVEST' not in cmd):
180 cmd +=
' --nStreams %s' % self.
nStreams 183 split = cmd.split(event_token)
184 pos_cmd =
" ".
join(split[1].
split(
" ")[1:])
185 cmd = split[0] + event_token +
'%s ' % self.
nEvents + pos_cmd
186 cmd+=closeCmd(istep,self.
wf.nameId)
189 wf_stats = open(
"%s/wf_steps.txt" % self.
wfDir,
"a")
190 wf_stats.write(
'step%s:%s\n' % (istep, cmd))
192 else: retStep = self.
doCmd(cmd)
218 endtime=
'date %s' %time.asctime()
219 tottime=
'%s-%s'%(endtime,startime)
225 for i,s
in enumerate(self.
stat):
226 logStat+=
'Step%d-%s '%(i,s)
228 self.
report=
'%s_%s %s - time %s; exit: '%(self.
wf.numId,self.
wf.nameId,logStat,tottime)+
' '.
join(
map(str,self.
retStep))+
'\n'
def __init__(self, wf, noRun=False, dryRun=False, cafVeto=True, dasOptions="", jobReport=False, nThreads=1, nStreams=0, maxSteps=9999, nEvents=0)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def split(sequence, size)
static std::string join(char **cmd)
recoOutput
relying on the syntax: cmsDriver -s STEPS –otherFlags
npass
needs to set self.report