CMS 3D CMS Logo

psClasses.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from __future__ import print_function
3 from builtins import range
4 import os,subprocess,sys,re,time,random
5 from threading import *
6 from subprocess import call
7 #Some Constants
8 STATE_CREATED,STATE_COMPLETED,STATE_ERROR=list(range(3))
9 
10 ### Classes
11 class BuildThread(Thread):
12 
13  def __init__(self, parent, queue, weight = 1):
14  Thread.__init__(self)
15  self.BuildNode = parent
16  self.QueueList = queue
17  self.Weight = weight
18  self.IsComplete = Condition()
19  self.Queue = None
20 
21  def putInServerQueue(self):
22  self.QueueList.QueueLock.acquire()
23  sSrv=self.QueueList.smallestQueue()
24  tSrv=self.QueueList.thinerQueue()
25  self.Queue=sSrv
26  if sSrv == tSrv:
27  self.QueueList[sSrv].append(self)
28  elif self.Weight + self.QueueList[sSrv].queueWeight() <= (self.QueueList.Cores/self.QueueList.Jay)*1.5:
29  self.QueueList[sSrv].append(self)
30  else:
31  self.QueueList[tSrv].append(self)
32  self.Queue=tSrv
33  self.QueueList.QueueLock.release()
34 
35  def build(self):
36  self.QueueList[self.Queue].QueueSem.acquire()
37  #call(['eval',"scram runtime -sh",";",'EdmPluginRefresh',self.BuildNode.LibName],shell="/bin/bash"))
38  rValue=call(['ssh',self.Queue,'cd ~/CMSSW_3_5_7;scram build -j %d' % self.QueueList.Jay,self.BuildNode.LibName])
39  self.QueueList.EdmRefreshLock.acquire()
40  call(['ssh',self.Queue,'cd ~/CMSSW_3_5_7;eval `scram runtime -sh`;EdmPluginRefresh %s' % self.BuildNode.LibName])
41  self.QueueList.EdmRefreshLock.release()
42  if rValue == 0:
43  self.BuildNode.State=STATE_COMPLETED
44  else:
45  print("Build failed for %s" % self.BuildNode.LibName)
46  self.BuildNode.State=STATE_ERROR
47  self.QueueList[self.Queue].QueueSem.release()
48 
49  def releaseAllLocks(self):
50  #for deps in self.BuildNode.DependsOn:
51  #deps.BThread.IsComplete.release()
52  self.BuildNode.State=STATE_ERROR
53  self.IsComplete.acquire()
54  self.IsComplete.notifyAll()
55  self.IsComplete.release()
56 
57  def run(self):
58  depsCompleted=False
59  while not depsCompleted:
60  depsCompleted=True
61  for deps in self.BuildNode.DependsOn:
62  if deps.State is STATE_ERROR :
63  self.releaseAllLocks()
64  return -1
65  if deps.State is not STATE_COMPLETED :
66  depsCompleted=False
67  deps.BThread.IsComplete.acquire()
68  deps.BThread.IsComplete.wait()
69  #deps.BThread.isAlive() and sys.stdout.write("Wait time exeded %s %s\n" % (deps.LibName,deps.Module))
70  deps.BThread.IsComplete.release()
71 
72  self.putInServerQueue()
73  self.build()
74  self.IsComplete.acquire()
75  self.IsComplete.notifyAll()
76  self.IsComplete.release()
77  return 0
78 
80 
81  def __init__(self,value=None):
82  self.SeenLibs = []
83  self.SeenModules = []
84  self.SeenSubModules = []
85  if value:
86  list.__init__(self,value)
87  else:
88  list.__init__(self)
89 
90  def findLib(self,lib,head=None):
91  if len(self) == 0:
92  return None
93  if head == self:
94  return None
95  if head == None:
96  head = self
97  itP=None
98  for it in self:
99  if lib == it.LibName:
100  return it
101  else:
102  itP=it.AreDependent.findLib(lib,head)
103  if itP is not None:
104  return itP
105  return itP
106 
107  def startThreads(self):
108  for node in self:
109  if not node.BThread.is_alive():
110  try:
111  node.BThread.start()
112  except:
113  pass
114  node.AreDependent.startThreads()
115 
116  def findDep(self,dep):
117  return self.findLib(dep.replace("/",""))
118 
119  def __setitem__(self,index,value):
120  if not value.__class__ == BuildTreeNode:
121  raise TypeError("Expected BuildTreeNode")
122  self.SeenLibs.append(value.LibName)
123  self.SeenModules.append(value.Module)
124  self.SeenSubModules.append(value.SubModule)
125  list.__setitem__(self,index,value)
126 
127  def append(self,value):
128  if not value.__class__ == BuildTreeNode:
129  raise TypeError("Expected BuildTreeNode")
130  value.LibName not in self.SeenLibs and self.SeenLibs.append(value.LibName)
131  value.Module not in self.SeenModules and self.SeenModules.append(value.Module)
132  value.SubModule not in self.SeenSubModules and self.SeenSubModules.append(value.SubModule)
133  list.append(self,value)
134 
135  def __str__(self,topDown=False,direction=False):
136  if not topDown:
137  return "[\n...%s]" % str("\n...".join([str(it).replace("...","......") for it in self]))
138  else:
139  if direction:
140  return "[\n---%s]" % "\n---".join([ "'%s':'%s'\n------State = %d \n------DependsOn : %s " % (it.LibName,it.SubModule or it.Module,it.State,it.DependsOn.__str__(True,True).replace("---","------")) for it in self ])
141  else:
142  return "\n".join([it.__str__(True) for it in self])
143 
145 
146  def __init__(self,libname="",module="",submodule="",depends=None,areDependent=None,weight=1,srvqueue=None):
147  if depends==None:
149  else:
150  self.DependsOn = depends
151  if areDependent==None:
153  else:
154  self.AreDependent = areDependent
155  self.Module = module
156  self.LibName = libname
157  self.SubModule = submodule
158  self.BThread = srvqueue is not None and BuildThread(self,srvqueue,weight) or None
159  self.State = STATE_CREATED
160 
161  def __setattr__(self,name,value):
162  if name is "DependsOn" or name is "AreDependent":
163  if not value.__class__ == BuildTreeNodeList:
164  raise TypeError("Expected BuildTreeNodeList")
165  elif name is "State" or name is "ModulesToDo":
166  if not value.__class__ == int:
167  raise TypeError("Expected int")
168  elif name is "Module" or name is "SubModule" or name is "LibName":
169  if not value.__class__ == str:
170  raise TypeError("Expected str")
171  object.__setattr__(self,name,value)
172 
173  def __str__(self,topDown=False):
174  if not topDown:
175  return "'%s':'%s'\n...State = %d , Is it Done = %s \n...AreDependent : %s " % (self.LibName,self.SubModule or self.Module,self.State,self.BThread.IsComplete,self.AreDependent)
176  else:
177  if len(self.AreDependent)== 0:
178  return "'%s':'%s'\n---State = %d \n---DependsOn : %s " % (self.LibName,self.SubModule or self.Module,self.State,self.DependsOn.__str__(True,True).replace("---","------"))
179  else:
180  return self.AreDependent.__str__(topDown=True)
181 ####
182 # Class to manage server build queues
183 ####
184 class queueNode():
185  def __init__(self,cores = 4,jay = 2):
186  self.QueueSem = BoundedSemaphore(value=cores/jay)
187  self.RunningThreads = []
188  self.ThreadLog = []
189 
190  def append(self,item):
191  self.RunningThreads.append(item)
192  t=time.time()
193  self.ThreadLog.append([t,item.BuildNode.LibName,"INFO: %s thread %s added for Library %s" % (t,item.name,item.BuildNode.LibName)])
194 
195  def pendingThreads(self):
196  return len([x for x in self.RunningThreads if x.is_alive()])
197 
198  def queueWeight(self):
199  return sum([x.Weight for x in self.RunningThreads if x.is_alive()])
200 
201 
202 
204  def __init__(self,servers = [],cores = 4,jay = 2):
205  dict.__init__(self,dict.fromkeys(servers))
206  for srv in self.keys():
207  self[srv]=queueNode(cores,jay)
208  self.Cores = cores
209  self.Jay = jay
210  self.QueueLock = RLock()
211  self.EdmRefreshLock = RLock()
212 
213 
214  def smallestQueue(self):
215  smallest=self.keys()[0]
216  sizeSmallest=self[smallest].pendingThreads()
217  for srv in self.keys()[1:]:
218  size=self[srv].pendingThreads()
219  if size < sizeSmallest:
220  smallest = srv
221  sizeSmallest = size
222  return smallest
223 
224  def thinerQueue(self):
225  thinnest=self.keys()[0]
226  weightThinnest=self[thinnest].queueWeight()
227  for srv in self.keys()[1:]:
228  weight=self[srv].queueWeight()
229  if weight < weightThinnest:
230  thinnest = srv
231  weightThinnest = weight
232  return thinnest
233 
234 
235 
236 
237 
238 
def __str__(self, topDown=False, direction=False)
Definition: psClasses.py:135
def queueWeight(self)
Definition: psClasses.py:198
def findLib(self, lib, head=None)
Definition: psClasses.py:90
def replace(string, replacements)
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def findDep(self, dep)
Definition: psClasses.py:116
def __init__(self, cores=4, jay=2)
Definition: psClasses.py:185
def __init__(self, value=None)
Definition: psClasses.py:81
def pendingThreads(self)
Definition: psClasses.py:195
def thinerQueue(self)
Definition: psClasses.py:224
def __setattr__(self, name, value)
Definition: psClasses.py:161
def releaseAllLocks(self)
Definition: psClasses.py:49
def __str__(self, topDown=False)
Definition: psClasses.py:173
def __init__(self, servers=[], cores=4, jay=2)
Definition: psClasses.py:204
def smallestQueue(self)
Definition: psClasses.py:214
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
def __setitem__(self, index, value)
Definition: psClasses.py:119
def append(self, value)
Definition: psClasses.py:127
def __init__(self, libname="", module="", submodule="", depends=None, areDependent=None, weight=1, srvqueue=None)
Definition: psClasses.py:146
def putInServerQueue(self)
Definition: psClasses.py:21
def append(self, item)
Definition: psClasses.py:190
#define str(s)
def __init__(self, parent, queue, weight=1)
Definition: psClasses.py:13
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run