2 from threading
import Thread
4 from Configuration.PyReleaseValidation
import WorkFlow
7 from subprocess
import Popen
8 from os.path
import exists, basename, join
10 from datetime
import datetime
11 from hashlib
import sha1
12 import urllib2, base64, json, re
13 from socket
import gethostname
20 es_hostname = getenv(
"ES_HOSTNAME")
21 es_auth = getenv(
"ES_AUTH")
22 if not es_hostname
and not es_auth:
25 sha1_id = sha1(kwds[
"release"] + kwds[
"architecture"] + kwds[
"workflow"] + str(kwds[
"step"])).hexdigest()
27 if "_201" in kwds[
"release"]:
28 datepart =
"201" + kwds[
"release"].
split(
"_201")[1]
29 d = datetime.strptime(datepart,
"%Y-%m-%d-%H00")
30 payload[
"release_queue"] = kwds[
"release"].
split(
"_201")[0]
31 payload[
"release_date"] = d.strftime(
"%Y-%m-%d-%H00")
33 logFile = payload.pop(
"log_file",
"")
41 payload[
"message"] = lines
42 for l
in lines.split(
"\n"):
43 if l.startswith(
"----- Begin Fatal Exception"):
46 if l.startswith(
"----- End Fatal Exception"):
49 if l.startswith(
"%MSG-e"):
52 error_kind = re.split(
" [0-9a-zA-Z-]* [0-9:]{8} CET", error)[0].
replace(
"%MSG-e ",
"")
54 if inError ==
True and l.startswith(
"%MSG"):
56 errors.append({
"error": error,
"kind": error_kind})
66 payload[
"exception"] = exception
68 payload[
"errors"] = errors
70 payload[
"hostname"] = gethostname()
71 url =
"https://%s/ib-matrix.%s/runTheMatrix-data/%s" % (es_hostname,
72 d.strftime(
"%Y-%W-1"),
74 request = urllib2.Request(url)
76 base64string = base64.encodestring(es_auth).
replace(
'\n',
'')
77 request.add_header(
"Authorization",
"Basic %s" % base64string)
78 request.get_method =
lambda:
'PUT'
79 data = json.dumps(payload)
81 result = urllib2.urlopen(request, data=data)
82 except urllib2.HTTPError, e:
90 def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1):
105 self.
wfDir=str(self.wf.numId)+
'_'+self.wf.nameId
110 msg =
"\n# in: " +os.getcwd()
111 if self.
dryRun: msg +=
" dryRun for '"
112 else: msg +=
" going to execute "
113 msg += cmd.replace(
';',
'\n')
116 cmdLog = open(self.
wfDir+
'/cmdLog',
'a')
117 cmdLog.write(msg+
'\n')
122 p = Popen(cmd, shell=
True)
123 ret = os.waitpid(p.pid, 0)[1]
125 print "ERROR executing ",cmd,
'ret=', ret
131 startDir = os.getcwd()
133 if not os.path.exists(self.
wfDir):
134 os.makedirs(self.
wfDir)
136 print "cleaning up ", self.
wfDir,
' in ', os.getcwd()
137 shutil.rmtree(self.
wfDir)
138 os.makedirs(self.
wfDir)
140 preamble =
'cd '+self.
wfDir+
'; '
142 realstarttime = datetime.now()
143 startime=
'date %s' %time.asctime()
147 if 'cms/caf/cms' in os.environ[
'CMS_PATH']:
158 return ' > %s 2>&1; ' % (
'step%d_'%(i,)+ID+
'.log ',)
163 for (istepmone,com)
in enumerate(self.wf.cmds):
173 self.retStep.append(0)
174 self.stat.append(
'NOTRUN')
176 if not isinstance(com,str):
177 if self.
cafVeto and (com.location ==
'CAF' and not onCAF):
178 print "You need to be no CAF to run",self.wf.numId
181 self.retStep.append(0)
182 self.stat.append(
'NOTRUN')
186 cmd2 = com.lumiRanges()
188 cmd2 =cmd+cmd2+closeCmd(istep,
'lumiRanges')
189 lumiRangeFile=
'step%d_lumiRanges.log'%(istep,)
190 retStep = self.
doCmd(cmd2)
192 cmd+=closeCmd(istep,
'dasquery')
193 retStep = self.
doCmd(cmd)
197 dasOutputPath =
join(self.
wfDir,
'step%d_dasquery.log'%(istep,))
198 if not exists(dasOutputPath):
204 dasOutput = [l
for l
in open(dasOutputPath).
read().
split(
"\n")
if l.startswith(
"/")]
209 inFile =
'filelist:' + basename(dasOutputPath)
217 cmd +=
' --filein '+inFile
220 cmd +=
' --lumiToProcess '+lumiRangeFile
223 if 'HARVESTING' in cmd
and not 134==self.wf.numId
and not '--filein' in cmd:
224 cmd+=
' --filein file:step%d_inDQM.root --fileout file:step%d.root '%(istep-1,istep)
226 if istep!=1
and not '--filein' in cmd:
227 cmd+=
' --filein file:step%s.root '%(istep-1,)
228 if not '--fileout' in com:
229 cmd+=
' --fileout file:step%s.root '%(istep,)
231 cmd +=
' --suffix "-j JobReport%s.xml " ' % istep
233 cmd +=
' --nThreads %s' % self.
nThreads
234 cmd+=closeCmd(istep,self.wf.nameId)
237 release=getenv(
"CMSSW_VERSION"),
238 architecture=getenv(
"SCRAM_ARCH"),
242 start_time=realstarttime.isoformat(),
243 workflow_id=self.wf.numId)
244 retStep = self.
doCmd(cmd)
248 self.retStep.append(retStep)
253 self.stat.append(
'TIMEOUT')
260 self.stat.append(
"DAS_ERROR")
262 self.stat.append(
'FAILED')
269 self.stat.append(
'PASSED')
272 release=getenv(
"CMSSW_VERSION"),
273 architecture=getenv(
"SCRAM_ARCH"),
276 status=self.
stat[-1],
277 start_time=realstarttime.isoformat(),
278 end_time=datetime.now().isoformat(),
279 delta_time=(datetime.now() - realstarttime).seconds,
280 workflow_id=self.wf.numId,
281 log_file=
"%s/step%d_%s.log" % (self.
wfDir, istep, self.wf.nameId))
285 endtime=
'date %s' %time.asctime()
286 tottime=
'%s-%s'%(endtime,startime)
292 for i,s
in enumerate(self.
stat):
293 logStat+=
'Step%d-%s '%(i,s)
294 self.
report=
'%s_%s %s - time %s; exit: '%(self.wf.numId,self.wf.nameId,logStat,tottime)+
' '.
join(
map(str,self.
retStep))+
'\n'
static std::string join(char **cmd)
npass
needs to set self.report