CMS 3D CMS Logo

MatrixRunner.py
Go to the documentation of this file.
1 import os, sys, time
2 
3 from Configuration.PyReleaseValidation.WorkFlow import WorkFlow
4 from Configuration.PyReleaseValidation.WorkFlowRunner import WorkFlowRunner
5 
6 # ================================================================================
7 
9 
10  def __init__(self, wfIn=None, nThrMax=4, nThreads=1):
11 
12  self.workFlows = wfIn
13 
14  self.threadList = []
15  self.maxThreads = nThrMax
16  self.nThreads = nThreads
17 
18  #the directories in which it happened
19  self.runDirs={}
20 
21  def activeThreads(self):
22 
23  nActive = 0
24  for t in self.threadList:
25  if t.isAlive() : nActive += 1
26 
27  return nActive
28 
29 
30  def runTests(self, opt):
31 
32  testList=opt.testList
33  dryRun=opt.dryRun
34  cafVeto=opt.cafVeto
35 
36  startDir = os.getcwd()
37 
38  report=''
39  noRun=(self.maxThreads==0)
40  if noRun:
41  print 'Not running the wf, only creating cfgs and logs'
42  print 'resetting to default number of threads'
43  self.maxThreads=4
44 
45  print 'Running in %s thread(s)' % self.maxThreads
46 
47 
48  for wf in self.workFlows:
49 
50  if testList and float(wf.numId) not in [float(x) for x in testList]: continue
51 
52  item = wf.nameId
53  if os.path.islink(item) : continue # ignore symlinks
54 
55  # make sure we don't run more than the allowed number of threads:
56  while self.activeThreads() >= self.maxThreads:
57  time.sleep(1)
58 
59  print '\nPreparing to run %s %s' % (wf.numId, item)
60  sys.stdout.flush()
61  current = WorkFlowRunner(wf,noRun,dryRun,cafVeto, opt.dasOptions, opt.jobReports, opt.nThreads, opt.maxSteps)
62  self.threadList.append(current)
63  current.start()
64  if not dryRun:
65  time.sleep(0.5) # try to avoid race cond by sleeping 0.5 sec
66 
67  # wait until all threads are finished
68  while self.activeThreads() > 0:
69  time.sleep(0.5)
70 
71 
72  #wrap up !
73  totpassed=[]
74  totfailed=[]
75  def count(collect,result):
76  #pad with zeros
77  for i in range(len(collect),len(result)):
78  collect.append(0)
79  for i,c in enumerate(result):
80  collect[i]+=c
81 
82  for pingle in self.threadList:
83  pingle.join()
84  try:
85  count(totpassed,pingle.npass)
86  count(totfailed,pingle.nfail)
87  report+=pingle.report
88  self.runDirs[pingle.wf.numId]=pingle.wfDir
89  except Exception as e:
90  msg = "ERROR retrieving info from thread: " + str(e)
91  report += msg
92 
93  report+=' '.join(map(str,totpassed))+' tests passed, '+' '.join(map(str,totfailed))+' failed\n'
94  print report
95  sys.stdout.flush()
96 
97  runall_report_name='runall-report-step123-.log'
98  runall_report=open(runall_report_name,'w')
99  runall_report.write(report)
100  runall_report.close()
101  os.chdir(startDir)
102 
103  anyFail=sum(totfailed)
104 
105  return anyFail
106 
def __init__(self, wfIn=None, nThrMax=4, nThreads=1)
Definition: MatrixRunner.py:10
def runTests(self, opt)
Definition: MatrixRunner.py:30
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
#define str(s)