CMS 3D CMS Logo

psClasses.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 from builtins import range
4 import os,subprocess,sys,re,time,random
5 from threading import *
6 from subprocess import call
7 #Some Constants
8 STATE_CREATED,STATE_COMPLETED,STATE_ERROR=list(range(3))
9 
10 
11 class BuildThread(Thread):
12 
13  def __init__(self, parent, queue, weight = 1):
14  Thread.__init__(self)
15  self.BuildNode = parent
16  self.QueueList = queue
17  self.Weight = weight
18  self.IsComplete = Condition()
19  self.Queue = None
20 
21  def putInServerQueue(self):
22  self.QueueList.QueueLock.acquire()
23  sSrv=self.QueueList.smallestQueue()
24  tSrv=self.QueueList.thinerQueue()
25  self.Queue=sSrv
26  if sSrv == tSrv:
27  self.QueueList[sSrv].append(self)
28  elif self.Weight + self.QueueList[sSrv].queueWeight() <= (self.QueueList.Cores/self.QueueList.Jay)*1.5:
29  self.QueueList[sSrv].append(self)
30  else:
31  self.QueueList[tSrv].append(self)
32  self.Queue=tSrv
33  self.QueueList.QueueLock.release()
34 
35  def build(self):
36  self.QueueList[self.Queue].QueueSem.acquire()
37  #call(['eval',"scram runtime -sh",";",'EdmPluginRefresh',self.BuildNode.LibName],shell="/bin/bash"))
38  rValue=call(['ssh',self.Queue,'cd ~/CMSSW_3_5_7;scram build -j %d' % self.QueueList.Jay,self.BuildNode.LibName])
39  self.QueueList.EdmRefreshLock.acquire()
40  call(['ssh',self.Queue,'cd ~/CMSSW_3_5_7;eval `scram runtime -sh`;EdmPluginRefresh %s' % self.BuildNode.LibName])
41  self.QueueList.EdmRefreshLock.release()
42  if rValue == 0:
43  self.BuildNode.State=STATE_COMPLETED
44  else:
45  print("Build failed for %s" % self.BuildNode.LibName)
46  self.BuildNode.State=STATE_ERROR
47  self.QueueList[self.Queue].QueueSem.release()
48 
49  def releaseAllLocks(self):
50  #for deps in self.BuildNode.DependsOn:
51  #deps.BThread.IsComplete.release()
52  self.BuildNode.State=STATE_ERROR
53  self.IsComplete.acquire()
54  self.IsComplete.notifyAll()
55  self.IsComplete.release()
56 
57  def run(self):
58  depsCompleted=False
59  while not depsCompleted:
60  depsCompleted=True
61  for deps in self.BuildNode.DependsOn:
62  if deps.State is STATE_ERROR :
63  self.releaseAllLocks()
64  return -1
65  if deps.State is not STATE_COMPLETED :
66  depsCompleted=False
67  deps.BThread.IsComplete.acquire()
68  deps.BThread.IsComplete.wait()
69  #deps.BThread.is_alive() and sys.stdout.write("Wait time exeded %s %s\n" % (deps.LibName,deps.Module))
70  deps.BThread.IsComplete.release()
71 
72  self.putInServerQueue()
73  self.build()
74  self.IsComplete.acquire()
75  self.IsComplete.notifyAll()
76  self.IsComplete.release()
77  return 0
78 
79 class BuildTreeNodeList (list):
80 
81  def __init__(self,value=None):
82  self.SeenLibs = []
83  self.SeenModules = []
84  self.SeenSubModules = []
85  if value:
86  list.__init__(self,value)
87  else:
88  list.__init__(self)
89 
90  def findLib(self,lib,head=None):
91  if len(self) == 0:
92  return None
93  if head == self:
94  return None
95  if head == None:
96  head = self
97  itP=None
98  for it in self:
99  if lib == it.LibName:
100  return it
101  else:
102  itP=it.AreDependent.findLib(lib,head)
103  if itP is not None:
104  return itP
105  return itP
106 
107  def startThreads(self):
108  for node in self:
109  if not node.BThread.is_alive():
110  try:
111  node.BThread.start()
112  except:
113  pass
114  node.AreDependent.startThreads()
115 
116  def findDep(self,dep):
117  return self.findLib(dep.replace("/",""))
118 
119  def __setitem__(self,index,value):
120  if not value.__class__ == BuildTreeNode:
121  raise TypeError("Expected BuildTreeNode")
122  self.SeenLibs.append(value.LibName)
123  self.SeenModules.append(value.Module)
124  self.SeenSubModules.append(value.SubModule)
125  list.__setitem__(self,index,value)
126 
127  def append(self,value):
128  if not value.__class__ == BuildTreeNode:
129  raise TypeError("Expected BuildTreeNode")
130  value.LibName not in self.SeenLibs and self.SeenLibs.append(value.LibName)
131  value.Module not in self.SeenModules and self.SeenModules.append(value.Module)
132  value.SubModule not in self.SeenSubModules and self.SeenSubModules.append(value.SubModule)
133  list.append(self,value)
134 
135  def __str__(self,topDown=False,direction=False):
136  if not topDown:
137  return "[\n...%s]" % str("\n...".join([str(it).replace("...","......") for it in self]))
138  else:
139  if direction:
140  return "[\n---%s]" % "\n---".join([ "'%s':'%s'\n------State = %d \n------DependsOn : %s " % (it.LibName,it.SubModule or it.Module,it.State,it.DependsOn.__str__(True,True).replace("---","------")) for it in self ])
141  else:
142  return "\n".join([it.__str__(True) for it in self])
143 
145 
146  def __init__(self,libname="",module="",submodule="",depends=None,areDependent=None,weight=1,srvqueue=None):
147  if depends==None:
149  else:
150  self.DependsOn = depends
151  if areDependent==None:
153  else:
154  self.AreDependent = areDependent
155  self.Module = module
156  self.LibName = libname
157  self.SubModule = submodule
158  self.BThread = srvqueue is not None and BuildThread(self,srvqueue,weight) or None
159  self.State = STATE_CREATED
160 
161  def __setattr__(self,name,value):
162  if name is "DependsOn" or name is "AreDependent":
163  if not value.__class__ == BuildTreeNodeList:
164  raise TypeError("Expected BuildTreeNodeList")
165  elif name is "State" or name is "ModulesToDo":
166  if not value.__class__ == int:
167  raise TypeError("Expected int")
168  elif name is "Module" or name is "SubModule" or name is "LibName":
169  if not value.__class__ == str:
170  raise TypeError("Expected str")
171  object.__setattr__(self,name,value)
172 
173  def __str__(self,topDown=False):
174  if not topDown:
175  return "'%s':'%s'\n...State = %d , Is it Done = %s \n...AreDependent : %s " % (self.LibName,self.SubModule or self.Module,self.State,self.BThread.IsComplete,self.AreDependent)
176  else:
177  if len(self.AreDependent)== 0:
178  return "'%s':'%s'\n---State = %d \n---DependsOn : %s " % (self.LibName,self.SubModule or self.Module,self.State,self.DependsOn.__str__(True,True).replace("---","------"))
179  else:
180  return self.AreDependent.__str__(topDown=True)
181 
184 class queueNode():
185  def __init__(self,cores = 4,jay = 2):
186  self.QueueSem = BoundedSemaphore(value=cores/jay)
187  self.RunningThreads = []
188  self.ThreadLog = []
189 
190  def append(self,item):
191  self.RunningThreads.append(item)
192  t=time.time()
193  self.ThreadLog.append([t,item.BuildNode.LibName,"INFO: %s thread %s added for Library %s" % (t,item.name,item.BuildNode.LibName)])
194 
195  def pendingThreads(self):
196  return len([x for x in self.RunningThreads if x.is_alive()])
197 
198  def queueWeight(self):
199  return sum([x.Weight for x in self.RunningThreads if x.is_alive()])
200 
201 
202 
203 class queueList(dict):
204  def __init__(self,servers = [],cores = 4,jay = 2):
205  dict.__init__(self,dict.fromkeys(servers))
206  for srv in self.keys():
207  self[srv]=queueNode(cores,jay)
208  self.Cores = cores
209  self.Jay = jay
210  self.QueueLock = RLock()
211  self.EdmRefreshLock = RLock()
212 
213 
214  def smallestQueue(self):
215  smallest=self.keys()[0]
216  sizeSmallest=self[smallest].pendingThreads()
217  for srv in self.keys()[1:]:
218  size=self[srv].pendingThreads()
219  if size < sizeSmallest:
220  smallest = srv
221  sizeSmallest = size
222  return smallest
223 
224  def thinerQueue(self):
225  thinnest=self.keys()[0]
226  weightThinnest=self[thinnest].queueWeight()
227  for srv in self.keys()[1:]:
228  weight=self[srv].queueWeight()
229  if weight < weightThinnest:
230  thinnest = srv
231  weightThinnest = weight
232  return thinnest
233 
234 
235 
236 
237 
238 
def __str__(self, topDown=False, direction=False)
Definition: psClasses.py:135
def queueWeight(self)
Definition: psClasses.py:198
def findLib(self, lib, head=None)
Definition: psClasses.py:90
def replace(string, replacements)
def findDep(self, dep)
Definition: psClasses.py:116
def __init__(self, cores=4, jay=2)
Definition: psClasses.py:185
def __init__(self, value=None)
Definition: psClasses.py:81
def pendingThreads(self)
Definition: psClasses.py:195
def thinerQueue(self)
Definition: psClasses.py:224
def __setattr__(self, name, value)
Definition: psClasses.py:161
def releaseAllLocks(self)
Definition: psClasses.py:49
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def __str__(self, topDown=False)
Definition: psClasses.py:173
def __init__(self, servers=[], cores=4, jay=2)
Definition: psClasses.py:204
def smallestQueue(self)
Definition: psClasses.py:214
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
def __setitem__(self, index, value)
Definition: psClasses.py:119
def append(self, value)
Definition: psClasses.py:127
def __init__(self, libname="", module="", submodule="", depends=None, areDependent=None, weight=1, srvqueue=None)
Definition: psClasses.py:146
def putInServerQueue(self)
Definition: psClasses.py:21
def append(self, item)
Definition: psClasses.py:190
#define str(s)
def __init__(self, parent, queue, weight=1)
Definition: psClasses.py:13