00001 from Queue import Queue
00002 from threading import Thread
00003 from time import sleep
00004 import threading
00005 import sys
00006 import traceback
00007 from StringIO import StringIO
00008
00009
00010
00011 class _SchedulerQuitCommand(object):
00012 pass
00013
00014 def transition(what, fromList, toList):
00015 try:
00016 fromList.remove(what)
00017 except ValueError, e:
00018 print what + " not in source list"
00019 raise e
00020 toList.append(what)
00021
00022 class Scheduler(object):
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 def __init__(self, parallelThreads, logDelegate=None):
00041 self.workersQueue = Queue()
00042 self.resultsQueue = Queue()
00043 self.jobs = {}
00044 self.pendingJobs = []
00045 self.runningJobs = []
00046 self.doneJobs = []
00047 self.brokenJobs = []
00048 self.parallelThreads = parallelThreads
00049 self.logDelegate = logDelegate
00050 self.errors = {}
00051 if not logDelegate:
00052 self.logDelegate = self.__doLog
00053
00054
00055 self.finalJobDeps = []
00056 self.finalJobSpec = [self.doSerial, "final-job", self.finalJobDeps] + [self.__doLog, "Nothing else to be done, exiting."]
00057 self.resultsQueue.put((threading.currentThread(), self.finalJobSpec))
00058 self.jobs["final-job"] = {"scheduler": "serial", "deps": self.finalJobSpec, "spec": self.finalJobSpec}
00059 self.pendingJobs.append("final-job")
00060
00061 def run(self):
00062 for i in xrange(self.parallelThreads):
00063 t = Thread(target=self.__createWorker())
00064 t.daemon = True
00065 t.start()
00066
00067 self.notifyMaster(self.__rescheduleParallel)
00068
00069 while self.parallelThreads:
00070 try:
00071 who, item = self.resultsQueue.get()
00072
00073 item[0](*item[1:])
00074 sleep(0.1)
00075 except KeyboardInterrupt:
00076 print "Ctrl-c received, waiting for workers to finish"
00077 while self.workersQueue.full():
00078 self.workersQueue.get(False)
00079 self.shout(self.quit)
00080
00081
00082 while self.resultsQueue.full():
00083 item = self.resultsQueue.get()
00084 item[0](*item[1:])
00085 return
00086
00087
00088 def __createWorker(self):
00089 def worker():
00090 while True:
00091 taskId, item = self.workersQueue.get()
00092 try:
00093 result = item[0](*item[1:])
00094 except Exception, e:
00095 s = StringIO()
00096 traceback.print_exc(file=s)
00097 result = s.getvalue()
00098
00099 if type(result) == _SchedulerQuitCommand:
00100 self.notifyMaster(self.__releaseWorker)
00101 return
00102 self.log(str(item) + " done")
00103 self.notifyMaster(self.__updateJobStatus, taskId, result)
00104 self.notifyMaster(self.__rescheduleParallel)
00105
00106 return worker
00107
00108 def __releaseWorker(self):
00109 self.parallelThreads -= 1
00110
00111 def parallel(self, taskId, deps, *spec):
00112 if taskId in self.pendingJobs:
00113 self.log("Double task %s" % taskId)
00114 self.jobs[taskId] = {"scheduler": "parallel", "deps": deps, "spec":spec}
00115 self.pendingJobs.append(taskId)
00116 self.finalJobDeps.append(taskId)
00117
00118
00119 def __rescheduleParallel(self):
00120 parallelJobs = [j for j in self.pendingJobs if self.jobs[j]["scheduler"] == "parallel"]
00121
00122
00123 for taskId in parallelJobs:
00124 brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs]
00125 if not brokenDeps:
00126 continue
00127 transition(taskId, self.pendingJobs, self.brokenJobs)
00128 self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
00129
00130
00131
00132 if not self.pendingJobs:
00133 self.shout(self.quit)
00134 self.notifyMaster(self.quit)
00135 return
00136
00137
00138
00139 for taskId in parallelJobs:
00140 pendingDeps = [dep for dep in self.jobs[taskId]["deps"] if not dep in self.doneJobs]
00141 if pendingDeps:
00142 continue
00143
00144 transition(taskId, self.pendingJobs, self.runningJobs)
00145 self.__scheduleParallel(taskId, self.jobs[taskId]["spec"])
00146
00147
00148 def __updateJobStatus(self, taskId, error):
00149 if not error:
00150 transition(taskId, self.runningJobs, self.doneJobs)
00151 return
00152 transition(taskId, self.runningJobs, self.brokenJobs)
00153 self.errors[taskId] = error
00154
00155
00156 def __scheduleParallel(self, taskId, commandSpec):
00157 self.workersQueue.put((taskId, commandSpec))
00158
00159
00160 def shout(self, *commandSpec):
00161 for x in xrange(self.parallelThreads):
00162 self.__scheduleParallel("quit-" + str(x), commandSpec)
00163
00164
00165 def notifyMaster(self, *commandSpec):
00166 self.resultsQueue.put((threading.currentThread(), commandSpec))
00167
00168 def serial(self, taskId, deps, *commandSpec):
00169 spec = [self.doSerial, taskId, deps] + list(commandSpec)
00170 self.resultsQueue.put((threading.currentThread(), spec))
00171 self.jobs[taskId] = {"scheduler": "serial", "deps": deps, "spec": spec}
00172 self.pendingJobs.append(taskId)
00173 self.finalJobDeps.append(taskId)
00174
00175 def doSerial(self, taskId, deps, *commandSpec):
00176 brokenDeps = [dep for dep in deps if dep in self.brokenJobs]
00177 if brokenDeps:
00178 transition(taskId, self.pendingJobs, self.brokenJobs)
00179 self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
00180
00181 self.notifyMaster(self.__rescheduleParallel)
00182 return
00183
00184
00185 pendingDeps = [dep for dep in deps if not dep in self.doneJobs]
00186 if pendingDeps:
00187 self.resultsQueue.put((threading.currentThread(), [self.doSerial, taskId, deps] + list(commandSpec)))
00188 return
00189
00190 transition(taskId, self.pendingJobs, self.runningJobs)
00191 try:
00192 result = commandSpec[0](*commandSpec[1:])
00193 except Exception, e:
00194 s = StringIO()
00195 traceback.print_exc(file=s)
00196 result = s.getvalue()
00197 self.__updateJobStatus(taskId, result)
00198
00199 self.notifyMaster(self.__rescheduleParallel)
00200
00201
00202 def log(self, s):
00203 self.notifyMaster(self.logDelegate, s)
00204
00205
00206 def quit(self):
00207 self.log("Requested to quit.")
00208 return _SchedulerQuitCommand()
00209
00210
00211 def __doLog(self, s):
00212 print s
00213
00214 def reschedule(self):
00215 self.notifyMaster(self.__rescheduleParallel)
00216
00217 def dummyTask():
00218 sleep(0.1)
00219
00220 def dummyTaskLong():
00221 sleep(1)
00222
00223 def errorTask():
00224 return "This will always have an error"
00225
00226 def exceptionTask():
00227 raise Exception("foo")
00228
00229
00230 def scheduleMore(scheduler):
00231 scheduler.parallel("download", [], dummyTask)
00232 scheduler.parallel("build", ["download"], dummyTask)
00233 scheduler.serial("install", ["build"], dummyTask)
00234
00235 if __name__ == "__main__":
00236 scheduler = Scheduler(10)
00237 scheduler.run()
00238
00239 scheduler = Scheduler(1)
00240 scheduler.run()
00241
00242 scheduler = Scheduler(10)
00243 scheduler.parallel("test", [], scheduler.log, "This is england");
00244 scheduler.run()
00245
00246 scheduler = Scheduler(10)
00247 for x in xrange(50):
00248 scheduler.parallel("test" + str(x), [], dummyTask)
00249 scheduler.run()
00250
00251
00252 assert(len(scheduler.brokenJobs) == 0)
00253 assert(len(scheduler.jobs) == 51)
00254
00255 scheduler = Scheduler(1)
00256 scheduler.parallel("test", [], errorTask)
00257 scheduler.run()
00258
00259
00260 assert(len(scheduler.brokenJobs) == 2)
00261 assert(len(scheduler.runningJobs) == 0)
00262 assert(len(scheduler.doneJobs) == 0)
00263
00264
00265 scheduler = Scheduler(10)
00266 scheduler.parallel("test2", ["test1"], dummyTask)
00267 scheduler.parallel("test1", [], dummyTaskLong)
00268 scheduler.run()
00269 assert(scheduler.doneJobs == ["test1", "test2", "final-job"])
00270
00271
00272 scheduler = Scheduler(10)
00273 scheduler.parallel("test3", ["test2"], dummyTask)
00274 scheduler.parallel("test2", ["test1"], errorTask)
00275 scheduler.parallel("test1", [], dummyTaskLong)
00276 scheduler.run()
00277 assert(scheduler.doneJobs == ["test1"])
00278 assert(scheduler.brokenJobs == ["test2", "test3", "final-job"])
00279
00280
00281 scheduler = Scheduler(2)
00282 for x in xrange(250):
00283 scheduler.parallel("test" + str(x), [], dummyTask)
00284 print "Print Control-C to continue"
00285 scheduler.run()
00286
00287
00288 scheduler = Scheduler(2)
00289 scheduler.parallel("test", [], exceptionTask)
00290 scheduler.run()
00291 assert(scheduler.errors["test"])
00292
00293
00294 scheduler = Scheduler(2)
00295 scheduler.parallel("test0", [], dummyTask)
00296 scheduler.parallel("test1", [], exceptionTask)
00297 scheduler.parallel("test2", ["test1"], dummyTask)
00298 scheduler.run()
00299 assert(scheduler.errors["test1"])
00300 assert(scheduler.errors["test2"])
00301
00302
00303 scheduler = Scheduler(2)
00304 scheduler.serial("test0", [], dummyTask)
00305 scheduler.run()
00306 assert(scheduler.doneJobs == ["test0", "final-job"])
00307
00308
00309
00310 scheduler = Scheduler(2)
00311 scheduler.serial("test0", [], dummyTask)
00312 scheduler.serial("test1", ["test0"], dummyTask)
00313 scheduler.run()
00314 assert(scheduler.doneJobs == ["test0", "test1", "final-job"])
00315
00316
00317 scheduler = Scheduler(2)
00318 scheduler.serial("test1", ["test0"], dummyTask)
00319 scheduler.serial("test0", [], dummyTask)
00320 scheduler.run()
00321 assert(scheduler.doneJobs == ["test0", "test1", "final-job"])
00322
00323
00324 scheduler = Scheduler(2)
00325 scheduler.serial("test1", ["test0"], dummyTask)
00326 scheduler.serial("test0", [], dummyTask)
00327 scheduler.parallel("test2", [], dummyTask)
00328 scheduler.parallel("test3", [], dummyTask)
00329 scheduler.run()
00330 scheduler.doneJobs.sort()
00331 assert(scheduler.doneJobs == ["final-job", "test0", "test1", "test2", "test3"])
00332
00333
00334 scheduler = Scheduler(2)
00335 scheduler.serial("test1", ["test0"], dummyTask)
00336 scheduler.serial("test0", [], dummyTask)
00337 scheduler.parallel("test2", ["test1"], dummyTask)
00338 scheduler.parallel("test3", ["test2"], dummyTask)
00339 scheduler.run()
00340 assert(scheduler.doneJobs == ["test0", "test1", "test2", "test3", "final-job"])
00341
00342
00343
00344
00345
00346
00347
00348 scheduler = Scheduler(3)
00349 scheduler.serial("check-pkg", [], scheduleMore, scheduler)
00350 scheduler.run()
00351 assert(scheduler.doneJobs == ["check-pkg", "download", "build", "install", "final-job"])