00001 from Queue import Queue
00002 from threading import Thread
00003 from time import sleep
00004 import threading
00005 import sys
00006 import traceback
00007 from StringIO import StringIO
00008
00009
00010
00011 class _SchedulerQuitCommand(object):
00012 pass
00013
00014 def transition(what, fromList, toList):
00015 try:
00016 fromList.remove(what)
00017 except ValueError, e:
00018 print what + " not in source list"
00019 raise e
00020 toList.append(what)
00021
00022 class Scheduler(object):
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 def __init__(self, parallelThreads, logDelegate=None):
00038 self.workersQueue = Queue()
00039 self.resultsQueue = Queue()
00040 self.jobs = {}
00041 self.pendingJobs = []
00042 self.runningJobs = []
00043 self.doneJobs = []
00044 self.brokenJobs = []
00045 self.parallelThreads = parallelThreads
00046 self.logDelegate = logDelegate
00047 self.errors = {}
00048 if not logDelegate:
00049 self.logDelegate = self.__doLog
00050
00051 def run(self):
00052 for i in xrange(self.parallelThreads):
00053 t = Thread(target=self.__createWorker())
00054 t.daemon = True
00055 t.start()
00056
00057 self.notifyMaster(self.__rescheduleParallel)
00058
00059 while self.parallelThreads:
00060 try:
00061 who, item = self.resultsQueue.get()
00062
00063 item[0](*item[1:])
00064 sleep(0.1)
00065 except KeyboardInterrupt:
00066 print "Ctrl-c received, waiting for workers to finish"
00067 while self.workersQueue.full():
00068 self.workersQueue.get(False)
00069 self.shout(self.quit)
00070
00071
00072 while self.resultsQueue.full():
00073 item = self.resultsQueue.get()
00074 item[0](*item[1:])
00075 return
00076
00077
00078 def __createWorker(self):
00079 def worker():
00080 while True:
00081 taskId, item = self.workersQueue.get()
00082 try:
00083 result = item[0](*item[1:])
00084 except Exception, e:
00085 s = StringIO()
00086 traceback.print_exc(file=s)
00087 result = s.getvalue()
00088
00089 if type(result) == _SchedulerQuitCommand:
00090 self.notifyMaster(self.__releaseWorker)
00091 return
00092 self.log(str(item) + " done")
00093 self.notifyMaster(self.__updateJobStatus, taskId, result)
00094 self.notifyMaster(self.__rescheduleParallel)
00095
00096 return worker
00097
00098 def __releaseWorker(self):
00099 self.parallelThreads -= 1
00100
00101 def parallel(self, taskId, deps, *spec):
00102 if taskId in self.pendingJobs:
00103 self.log("Double task %s" % taskId)
00104 self.jobs[taskId] = {"scheduler": "parallel", "deps": deps, "spec":spec}
00105 self.pendingJobs.append(taskId)
00106
00107
00108 def __rescheduleParallel(self):
00109 parallelJobs = [j for j in self.pendingJobs if self.jobs[j]["scheduler"] == "parallel"]
00110
00111
00112 for taskId in parallelJobs:
00113 brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs]
00114 if not brokenDeps:
00115 continue
00116 transition(taskId, self.pendingJobs, self.brokenJobs)
00117 self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
00118
00119
00120
00121 if not self.pendingJobs:
00122 self.shout(self.quit)
00123 self.notifyMaster(self.quit)
00124 return
00125
00126
00127
00128 for taskId in parallelJobs:
00129 pendingDeps = [dep for dep in self.jobs[taskId]["deps"] if not dep in self.doneJobs]
00130 if pendingDeps:
00131 continue
00132
00133 transition(taskId, self.pendingJobs, self.runningJobs)
00134 self.__scheduleParallel(taskId, self.jobs[taskId]["spec"])
00135
00136
00137 def __updateJobStatus(self, taskId, error):
00138 if not error:
00139 transition(taskId, self.runningJobs, self.doneJobs)
00140 return
00141 transition(taskId, self.runningJobs, self.brokenJobs)
00142 self.errors[taskId] = error
00143
00144
00145 def __scheduleParallel(self, taskId, commandSpec):
00146 self.workersQueue.put((taskId, commandSpec))
00147
00148
00149 def shout(self, *commandSpec):
00150 for x in xrange(self.parallelThreads):
00151 self.__scheduleParallel("quit-" + str(x), commandSpec)
00152
00153
00154 def notifyMaster(self, *commandSpec):
00155 self.resultsQueue.put((threading.currentThread(), commandSpec))
00156
00157 def serial(self, taskId, deps, *commandSpec):
00158 spec = [self.doSerial, taskId, deps] + list(commandSpec)
00159 self.resultsQueue.put((threading.currentThread(), spec))
00160 self.jobs[taskId] = {"scheduler": "serial", "deps": deps, "spec": spec}
00161 self.pendingJobs.append(taskId)
00162
00163 def doSerial(self, taskId, deps, *commandSpec):
00164 brokenDeps = [dep for dep in deps if dep in self.brokenJobs]
00165 if brokenDeps:
00166 transition(taskId, self.pendingJobs, self.brokenJobs)
00167 self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
00168
00169 self.notifyMaster(self.__rescheduleParallel)
00170 return
00171
00172
00173 pendingDeps = [dep for dep in deps if not dep in self.doneJobs]
00174 if pendingDeps:
00175 self.resultsQueue.put((threading.currentThread(), [self.doSerial, taskId, deps] + list(commandSpec)))
00176 return
00177
00178 transition(taskId, self.pendingJobs, self.runningJobs)
00179 try:
00180 result = commandSpec[0](*commandSpec[1:])
00181 except Exception, e:
00182 s = StringIO()
00183 traceback.print_exc(file=s)
00184 result = s.getvalue()
00185 self.__updateJobStatus(taskId, result)
00186
00187 self.notifyMaster(self.__rescheduleParallel)
00188
00189
00190 def log(self, s):
00191 self.notifyMaster(self.logDelegate, s)
00192
00193
00194 def quit(self):
00195 self.log("Requested to quit.")
00196 return _SchedulerQuitCommand()
00197
00198
00199 def __doLog(self, s):
00200 print s
00201
00202 def reschedule(self):
00203 self.notifyMaster(self.__rescheduleParallel)
00204
00205 def dummyTask():
00206 sleep(0.1)
00207
00208 def dummyTaskLong():
00209 sleep(1)
00210
00211 def errorTask():
00212 return "This will always have an error"
00213
00214 def exceptionTask():
00215 raise Exception("foo")
00216
00217
00218 def scheduleMore(scheduler):
00219 scheduler.parallel("download", [], dummyTask)
00220 scheduler.parallel("build", ["download"], dummyTask)
00221 scheduler.serial("install", ["build"], dummyTask)
00222
00223 if __name__ == "__main__":
00224 scheduler = Scheduler(10)
00225 scheduler.run()
00226
00227 scheduler = Scheduler(1)
00228 scheduler.run()
00229
00230 scheduler = Scheduler(10)
00231 scheduler.parallel("test", [], scheduler.log, "This is england");
00232 scheduler.run()
00233
00234 scheduler = Scheduler(10)
00235 for x in xrange(50):
00236 scheduler.parallel("test" + str(x), [], dummyTask)
00237 scheduler.run()
00238 assert(len(scheduler.brokenJobs) == 0)
00239 assert(len(scheduler.jobs) == 50)
00240
00241 scheduler = Scheduler(1)
00242 scheduler.parallel("test", [], errorTask)
00243 scheduler.run()
00244 assert(len(scheduler.brokenJobs) == 1)
00245 assert(len(scheduler.runningJobs) == 0)
00246 assert(len(scheduler.doneJobs) == 0)
00247
00248
00249 scheduler = Scheduler(10)
00250 scheduler.parallel("test2", ["test1"], dummyTask)
00251 scheduler.parallel("test1", [], dummyTaskLong)
00252 scheduler.run()
00253 assert(scheduler.doneJobs == ["test1", "test2"])
00254
00255
00256 scheduler = Scheduler(10)
00257 scheduler.parallel("test3", ["test2"], dummyTask)
00258 scheduler.parallel("test2", ["test1"], errorTask)
00259 scheduler.parallel("test1", [], dummyTaskLong)
00260 scheduler.run()
00261 assert(scheduler.doneJobs == ["test1"])
00262 assert(scheduler.brokenJobs == ["test2", "test3"])
00263
00264
00265 scheduler = Scheduler(2)
00266 for x in xrange(250):
00267 scheduler.parallel("test" + str(x), [], dummyTask)
00268 print "Print Control-C to continue"
00269 scheduler.run()
00270
00271
00272 scheduler = Scheduler(2)
00273 scheduler.parallel("test", [], exceptionTask)
00274 scheduler.run()
00275 assert(scheduler.errors["test"])
00276
00277
00278 scheduler = Scheduler(2)
00279 scheduler.parallel("test0", [], dummyTask)
00280 scheduler.parallel("test1", [], exceptionTask)
00281 scheduler.parallel("test2", ["test1"], dummyTask)
00282 scheduler.run()
00283 assert(scheduler.errors["test1"])
00284 assert(scheduler.errors["test2"])
00285
00286
00287 scheduler = Scheduler(2)
00288 scheduler.serial("test0", [], dummyTask)
00289 scheduler.run()
00290 assert(scheduler.doneJobs == ["test0"])
00291
00292
00293
00294 scheduler = Scheduler(2)
00295 scheduler.serial("test0", [], dummyTask)
00296 scheduler.serial("test1", ["test0"], dummyTask)
00297 scheduler.run()
00298 assert(scheduler.doneJobs == ["test0", "test1"])
00299
00300
00301 scheduler = Scheduler(2)
00302 scheduler.serial("test1", ["test0"], dummyTask)
00303 scheduler.serial("test0", [], dummyTask)
00304 scheduler.run()
00305 assert(scheduler.doneJobs == ["test0", "test1"])
00306
00307
00308 scheduler = Scheduler(2)
00309 scheduler.serial("test1", ["test0"], dummyTask)
00310 scheduler.serial("test0", [], dummyTask)
00311 scheduler.parallel("test2", [], dummyTask)
00312 scheduler.parallel("test3", [], dummyTask)
00313 scheduler.run()
00314 scheduler.doneJobs.sort()
00315 assert(scheduler.doneJobs == ["test0", "test1", "test2", "test3"])
00316
00317
00318 scheduler = Scheduler(2)
00319 scheduler.serial("test1", ["test0"], dummyTask)
00320 scheduler.serial("test0", [], dummyTask)
00321 scheduler.parallel("test2", ["test1"], dummyTask)
00322 scheduler.parallel("test3", ["test2"], dummyTask)
00323 scheduler.run()
00324 assert(scheduler.doneJobs == ["test0", "test1", "test2", "test3"])
00325
00326
00327
00328
00329
00330
00331
00332 scheduler = Scheduler(3)
00333 scheduler.serial("check-pkg", [], scheduleMore, scheduler)
00334 scheduler.run()
00335 assert(scheduler.doneJobs == ["check-pkg", "download", "build", "install"])