CMS 3D CMS Logo

List of all members | Public Member Functions | Public Attributes
MatrixRunner.MatrixRunner Class Reference
Inheritance diagram for MatrixRunner.MatrixRunner:

Public Member Functions

def __init__ (self, wfIn=None, nThrMax=4, nThreads=1)
 
def activeThreads (self)
 
def runTests (self, opt)
 

Public Attributes

 maxThreads
 
 nThreads
 
 runDirs
 
 threadList
 
 workFlows
 

Detailed Description

Definition at line 9 of file MatrixRunner.py.

Constructor & Destructor Documentation

◆ __init__()

def MatrixRunner.MatrixRunner.__init__ (   self,
  wfIn = None,
  nThrMax = 4,
  nThreads = 1 
)

Definition at line 11 of file MatrixRunner.py.

11  def __init__(self, wfIn=None, nThrMax=4, nThreads=1):
12 
13  self.workFlows = wfIn
14 
15  self.threadList = []
16  self.maxThreads = nThrMax
17  self.nThreads = nThreads
18 
19  #the directories in which it happened
20  self.runDirs={}
21 
def __init__(self, dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir, the_dir)

Member Function Documentation

◆ activeThreads()

def MatrixRunner.MatrixRunner.activeThreads (   self)

Definition at line 22 of file MatrixRunner.py.

References MatrixRunner.MatrixRunner.threadList.

Referenced by MatrixRunner.MatrixRunner.runTests(), and addOnTests.StandardTester.runTests().

22  def activeThreads(self):
23 
24  nActive = 0
25  for t in self.threadList:
26  if t.is_alive() : nActive += 1
27 
28  return nActive
29 
30 

◆ runTests()

def MatrixRunner.MatrixRunner.runTests (   self,
  opt 
)

Definition at line 31 of file MatrixRunner.py.

References MatrixRunner.MatrixRunner.activeThreads(), mps_setup.append, submitPVResolutionJobs.count, dqmMemoryStats.float, join(), genParticles_cff.map, MatrixRunner.MatrixRunner.maxThreads, MatrixRunner.MatrixRunner.nThreads, print(), FastTimerService_cff.range, MatrixRunner.MatrixRunner.runDirs, str, MatrixRunner.MatrixRunner.threadList, MatrixRunner.MatrixRunner.workFlows, and MatrixReader.MatrixReader.workFlows.

31  def runTests(self, opt):
32 
33  testList=opt.testList
34  dryRun=opt.dryRun
35  cafVeto=opt.cafVeto
36 
37  startDir = os.getcwd()
38 
39  report=''
40  noRun=(self.maxThreads==0)
41  if noRun:
42  print('Not running the wf, only creating cfgs and logs')
43  self.maxThreads=4
44  print('resetting to default number of process threads = %s' % self.maxThreads)
45 
46  print('Running %s %s %s, each with %s thread%s per process' % ('up to' if self.maxThreads > 1 else '', self.maxThreads, 'concurrent jobs' if self.maxThreads > 1 else 'job', self.nThreads, 's' if self.nThreads > 1 else ''))
47 
48 
49  for wf in self.workFlows:
50 
51  if testList and float(wf.numId) not in [float(x) for x in testList]: continue
52 
53  item = wf.nameId
54  if os.path.islink(item) : continue # ignore symlinks
55 
56  # make sure we don't run more than the allowed number of threads:
57  while self.activeThreads() >= self.maxThreads:
58  time.sleep(1)
59 
60  print('\nPreparing to run %s %s' % (wf.numId, item))
61  sys.stdout.flush()
62  current = WorkFlowRunner(wf,noRun,dryRun,cafVeto, opt.dasOptions, opt.jobReports, opt.nThreads, opt.nStreams, opt.maxSteps)
63  self.threadList.append(current)
64  current.start()
65  if not dryRun:
66  time.sleep(0.5) # try to avoid race cond by sleeping 0.5 sec
67 
68  # wait until all threads are finished
69  while self.activeThreads() > 0:
70  time.sleep(0.5)
71 
72 
73  #wrap up !
74  totpassed=[]
75  totfailed=[]
76  def count(collect,result):
77  #pad with zeros
78  for i in range(len(collect),len(result)):
79  collect.append(0)
80  for i,c in enumerate(result):
81  collect[i]+=c
82 
83  for pingle in self.threadList:
84  pingle.join()
85  try:
86  count(totpassed,pingle.npass)
87  count(totfailed,pingle.nfail)
88  report+=pingle.report
89  self.runDirs[pingle.wf.numId]=pingle.wfDir
90  except Exception as e:
91  msg = "ERROR retrieving info from thread: " + str(e)
92  report += msg
93 
94  report+=' '.join(map(str,totpassed))+' tests passed, '+' '.join(map(str,totfailed))+' failed\n'
95  print(report)
96  sys.stdout.flush()
97 
98  runall_report_name='runall-report-step123-.log'
99  runall_report=open(runall_report_name,'w')
100  runall_report.write(report)
101  runall_report.close()
102  os.chdir(startDir)
103 
104  anyFail=sum(totfailed)
105 
106  return anyFail
107 
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
#define str(s)

Member Data Documentation

◆ maxThreads

MatrixRunner.MatrixRunner.maxThreads

◆ nThreads

MatrixRunner.MatrixRunner.nThreads

Definition at line 17 of file MatrixRunner.py.

Referenced by MatrixRunner.MatrixRunner.runTests().

◆ runDirs

MatrixRunner.MatrixRunner.runDirs

Definition at line 20 of file MatrixRunner.py.

Referenced by MatrixRunner.MatrixRunner.runTests().

◆ threadList

◆ workFlows

MatrixRunner.MatrixRunner.workFlows

Definition at line 13 of file MatrixRunner.py.

Referenced by MatrixRunner.MatrixRunner.runTests().