CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_2_9/src/PKGTOOLS/scheduler.py

Go to the documentation of this file.
00001 from Queue import Queue
00002 from threading import Thread
00003 from time import sleep
00004 import threading
00005 import sys
00006 import traceback
00007 from StringIO import StringIO
00008 
00009 # Helper class to avoid conflict between result
00010 # codes and quit state transition.
00011 class _SchedulerQuitCommand(object):
00012   pass
00013 
00014 def transition(what, fromList, toList):
00015   try:
00016     fromList.remove(what)
00017   except ValueError, e:
00018     print what + " not in source list"
00019     raise e
00020   toList.append(what)
00021 
00022 class Scheduler(object):
00023   # A simple job scheduler.
00024   # Workers queue is to specify to threads what to do. Results
00025   # queue is whatched by the master thread to wait for results
00026   # of workers computations.
00027   # All worker threads begin trying to fetch from the command queue (and are
00028   # therefore blocked).
00029   # Master thread does the scheduling and then sits waiting for results.
00030   # Scheduling implies iterating on the list of jobs and creates an entry
00031   # in the parallel queue for all the jobs which does not have any dependency
00032   # which is not done.
00033   # If a job has dependencies which did not build, move it do the failed queue.
00034   # Post an appropriate build command for all the new jobs which got added.
00035   # If there are jobs still to be done, post a reschedule job on the command queue
00036   # if there are no jobs left, post the "kill worker" task.
00037   # There is one, special, "final-job" which depends on all the scheduled jobs
00038   # either parallel or serial. This job is guaranteed to be executed last and
00039   # avoids having deadlocks due to all the queues having been disposed.
00040   def __init__(self, parallelThreads, logDelegate=None):
00041     self.workersQueue = Queue()
00042     self.resultsQueue = Queue()
00043     self.jobs = {}
00044     self.pendingJobs = []
00045     self.runningJobs = []
00046     self.doneJobs = []
00047     self.brokenJobs = []
00048     self.parallelThreads = parallelThreads
00049     self.logDelegate = logDelegate
00050     self.errors = {}
00051     if not logDelegate:
00052       self.logDelegate = self.__doLog 
00053     # Add a final job, which will depend on any spawned task so that we do not
00054     # terminate until we are completely done.
00055     self.finalJobDeps = []
00056     self.finalJobSpec = [self.doSerial, "final-job", self.finalJobDeps] + [self.__doLog, "Nothing else to be done, exiting."]
00057     self.resultsQueue.put((threading.currentThread(), self.finalJobSpec))
00058     self.jobs["final-job"] = {"scheduler": "serial", "deps": self.finalJobSpec, "spec": self.finalJobSpec}
00059     self.pendingJobs.append("final-job")
00060 
00061   def run(self):
00062     for i in xrange(self.parallelThreads):
00063       t = Thread(target=self.__createWorker())
00064       t.daemon = True
00065       t.start()
00066     
00067     self.notifyMaster(self.__rescheduleParallel)
00068     # Wait until all the workers are done.
00069     while self.parallelThreads:
00070       try:
00071         who, item = self.resultsQueue.get()
00072         #print who, item
00073         item[0](*item[1:])
00074         sleep(0.1)
00075       except KeyboardInterrupt:
00076         print "Ctrl-c received, waiting for workers to finish"
00077         while self.workersQueue.full():
00078           self.workersQueue.get(False)
00079         self.shout(self.quit)
00080     
00081     # Prune the queue.
00082     while self.resultsQueue.full():
00083       item = self.resultsQueue.get() 
00084       item[0](*item[1:])
00085     return
00086 
00087   # Create a worker.
00088   def __createWorker(self):
00089     def worker():
00090       while True:
00091         taskId, item = self.workersQueue.get()
00092         try:
00093           result = item[0](*item[1:])
00094         except Exception, e:
00095           s = StringIO()
00096           traceback.print_exc(file=s)
00097           result = s.getvalue()
00098           
00099         if type(result) == _SchedulerQuitCommand:
00100           self.notifyMaster(self.__releaseWorker)
00101           return
00102         self.log(str(item) + " done")
00103         self.notifyMaster(self.__updateJobStatus, taskId, result)
00104         self.notifyMaster(self.__rescheduleParallel)
00105         # Only in 2.5: self.workersQueue.task_done()
00106     return worker
00107   
00108   def __releaseWorker(self):
00109     self.parallelThreads -= 1
00110 
00111   def parallel(self, taskId, deps, *spec):
00112     if taskId in self.pendingJobs:
00113       self.log("Double task %s" % taskId)
00114     self.jobs[taskId] = {"scheduler": "parallel", "deps": deps, "spec":spec}
00115     self.pendingJobs.append(taskId)
00116     self.finalJobDeps.append(taskId)
00117 
00118   # Does the rescheduling of tasks. Derived class should call it.
00119   def __rescheduleParallel(self):
00120     parallelJobs = [j for j in self.pendingJobs if self.jobs[j]["scheduler"] == "parallel"]
00121     # First of all clean up the pending parallel jobs from all those
00122     # which have broken dependencies.
00123     for taskId in parallelJobs:
00124       brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs]
00125       if not brokenDeps:
00126         continue
00127       transition(taskId, self.pendingJobs, self.brokenJobs)
00128       self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
00129 
00130     # If no tasks left, quit. Notice we need to check also for serial jobs
00131     # since they might queue more parallel payloads.
00132     if not self.pendingJobs:
00133       self.shout(self.quit)
00134       self.notifyMaster(self.quit)
00135       return
00136 
00137     # Otherwise do another round of scheduling of all the tasks. In this
00138     # case we only queue parallel jobs to the parallel queue.
00139     for taskId in parallelJobs:
00140       pendingDeps = [dep for dep in self.jobs[taskId]["deps"] if not dep in self.doneJobs]
00141       if pendingDeps:
00142         continue
00143       # No broken dependencies and no pending ones. we can continue.
00144       transition(taskId, self.pendingJobs, self.runningJobs)
00145       self.__scheduleParallel(taskId, self.jobs[taskId]["spec"])
00146 
00147   # Update the job with the result of running.
00148   def __updateJobStatus(self, taskId, error):
00149     if not error:
00150       transition(taskId, self.runningJobs, self.doneJobs)
00151       return
00152     transition(taskId, self.runningJobs, self.brokenJobs)
00153     self.errors[taskId] = error
00154   
00155   # One task at the time.
00156   def __scheduleParallel(self, taskId, commandSpec):
00157     self.workersQueue.put((taskId, commandSpec))
00158 
00159   # Helper to enqueue commands for all the threads.
00160   def shout(self, *commandSpec):
00161     for x in xrange(self.parallelThreads):
00162       self.__scheduleParallel("quit-" + str(x), commandSpec)
00163 
00164   # Helper to enqueu replies to the master thread.
00165   def notifyMaster(self, *commandSpec):
00166     self.resultsQueue.put((threading.currentThread(), commandSpec))
00167 
00168   def serial(self, taskId, deps, *commandSpec):
00169     spec = [self.doSerial, taskId, deps] + list(commandSpec)
00170     self.resultsQueue.put((threading.currentThread(), spec))
00171     self.jobs[taskId] = {"scheduler": "serial", "deps": deps, "spec": spec}
00172     self.pendingJobs.append(taskId)
00173     self.finalJobDeps.append(taskId)
00174 
00175   def doSerial(self, taskId, deps, *commandSpec):
00176     brokenDeps = [dep for dep in deps if dep in self.brokenJobs]
00177     if brokenDeps:
00178       transition(taskId, self.pendingJobs, self.brokenJobs)
00179       self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
00180       # Remember to do the scheduling again!
00181       self.notifyMaster(self.__rescheduleParallel)
00182       return
00183     
00184     # Put back the task on the queue, since it has pending dependencies.
00185     pendingDeps = [dep for dep in deps if not dep in self.doneJobs]
00186     if pendingDeps:
00187       self.resultsQueue.put((threading.currentThread(), [self.doSerial, taskId, deps] + list(commandSpec)))
00188       return
00189     # No broken dependencies and no pending ones. Run the job.
00190     transition(taskId, self.pendingJobs, self.runningJobs)
00191     try:
00192       result = commandSpec[0](*commandSpec[1:])
00193     except Exception, e:
00194       s = StringIO()
00195       traceback.print_exc(file=s)
00196       result = s.getvalue()
00197     self.__updateJobStatus(taskId, result)
00198     # Remember to do the scheduling again!
00199     self.notifyMaster(self.__rescheduleParallel)
00200   
00201   # Helper method to do logging:
00202   def log(self, s):
00203     self.notifyMaster(self.logDelegate, s)
00204 
00205   # Task which forces a worker to quit.
00206   def quit(self):
00207     self.log("Requested to quit.")
00208     return _SchedulerQuitCommand()
00209 
00210   # Helper for printouts.
00211   def __doLog(self, s):
00212     print s
00213 
00214   def reschedule(self):
00215     self.notifyMaster(self.__rescheduleParallel)
00216 
00217 def dummyTask():
00218   sleep(0.1)
00219 
00220 def dummyTaskLong():
00221   sleep(1)
00222 
00223 def errorTask():
00224   return "This will always have an error"
00225 
00226 def exceptionTask():
00227   raise Exception("foo")
00228 
00229 # Mimics cmsBuild workflow.
00230 def scheduleMore(scheduler):
00231   scheduler.parallel("download", [], dummyTask)
00232   scheduler.parallel("build", ["download"], dummyTask)
00233   scheduler.serial("install", ["build"], dummyTask)
00234 
00235 if __name__ == "__main__":
00236   scheduler = Scheduler(10)
00237   scheduler.run()
00238 
00239   scheduler = Scheduler(1)
00240   scheduler.run()
00241   
00242   scheduler = Scheduler(10)
00243   scheduler.parallel("test", [], scheduler.log, "This is england");
00244   scheduler.run()
00245 
00246   scheduler = Scheduler(10)
00247   for x in xrange(50):
00248     scheduler.parallel("test" + str(x), [], dummyTask)
00249   scheduler.run()
00250   # Notice we have 51 jobs because there is always a toplevel one
00251   # which depends on all the others.
00252   assert(len(scheduler.brokenJobs) == 0)
00253   assert(len(scheduler.jobs) == 51)
00254   
00255   scheduler = Scheduler(1)
00256   scheduler.parallel("test", [], errorTask)
00257   scheduler.run()
00258   # Again, since the toplevel one always depend on all the others
00259   # it is always broken if something else is brokend.
00260   assert(len(scheduler.brokenJobs) == 2)
00261   assert(len(scheduler.runningJobs) == 0)
00262   assert(len(scheduler.doneJobs) == 0)
00263   
00264   # Check dependency actually works.
00265   scheduler = Scheduler(10)
00266   scheduler.parallel("test2", ["test1"], dummyTask)
00267   scheduler.parallel("test1", [], dummyTaskLong) 
00268   scheduler.run()
00269   assert(scheduler.doneJobs == ["test1", "test2", "final-job"])
00270 
00271   # Check dependency actually works.
00272   scheduler = Scheduler(10)
00273   scheduler.parallel("test3", ["test2"], dummyTask)
00274   scheduler.parallel("test2", ["test1"], errorTask)
00275   scheduler.parallel("test1", [], dummyTaskLong) 
00276   scheduler.run()
00277   assert(scheduler.doneJobs == ["test1"])
00278   assert(scheduler.brokenJobs == ["test2", "test3", "final-job"])
00279 
00280   # Check ctrl-C will exit properly.
00281   scheduler = Scheduler(2)
00282   for x in xrange(250):
00283     scheduler.parallel("test" + str(x), [], dummyTask)
00284   print "Print Control-C to continue"
00285   scheduler.run()
00286 
00287   # Handle tasks with exceptions.
00288   scheduler = Scheduler(2)
00289   scheduler.parallel("test", [], exceptionTask)
00290   scheduler.run()
00291   assert(scheduler.errors["test"])
00292 
00293   # Handle tasks which depend on tasks with exceptions.
00294   scheduler = Scheduler(2)
00295   scheduler.parallel("test0", [], dummyTask)
00296   scheduler.parallel("test1", [], exceptionTask)
00297   scheduler.parallel("test2", ["test1"], dummyTask)
00298   scheduler.run()
00299   assert(scheduler.errors["test1"])
00300   assert(scheduler.errors["test2"])
00301 
00302   # Handle serial execution tasks.
00303   scheduler = Scheduler(2)
00304   scheduler.serial("test0", [], dummyTask)
00305   scheduler.run()
00306   assert(scheduler.doneJobs == ["test0", "final-job"])
00307 
00308   # Handle serial execution tasks, one depends from
00309   # the previous one.
00310   scheduler = Scheduler(2)
00311   scheduler.serial("test0", [], dummyTask)
00312   scheduler.serial("test1", ["test0"], dummyTask)
00313   scheduler.run()
00314   assert(scheduler.doneJobs == ["test0", "test1", "final-job"])
00315 
00316   # Serial tasks depending on one another.
00317   scheduler = Scheduler(2)
00318   scheduler.serial("test1", ["test0"], dummyTask)
00319   scheduler.serial("test0", [], dummyTask)
00320   scheduler.run()
00321   assert(scheduler.doneJobs == ["test0", "test1", "final-job"])
00322 
00323   # Serial and parallel tasks being scheduled at the same time.
00324   scheduler = Scheduler(2)
00325   scheduler.serial("test1", ["test0"], dummyTask)
00326   scheduler.serial("test0", [], dummyTask)
00327   scheduler.parallel("test2", [], dummyTask)
00328   scheduler.parallel("test3", [], dummyTask)
00329   scheduler.run()
00330   scheduler.doneJobs.sort()
00331   assert(scheduler.doneJobs == ["final-job", "test0", "test1", "test2", "test3"])
00332 
00333   # Serial and parallel tasks. Parallel depends on serial.
00334   scheduler = Scheduler(2)
00335   scheduler.serial("test1", ["test0"], dummyTask)
00336   scheduler.serial("test0", [], dummyTask)
00337   scheduler.parallel("test2", ["test1"], dummyTask)
00338   scheduler.parallel("test3", ["test2"], dummyTask)
00339   scheduler.run()
00340   assert(scheduler.doneJobs == ["test0", "test1", "test2", "test3", "final-job"])
00341 
00342   # Serial task scheduling two parallel task and another dependent
00343   # serial task. This is actually what needs to be done for building 
00344   # packages. I.e.
00345   # The first serial task is responsible for checking if a package is already there,
00346   # then it queues a parallel download sources task, a subsequent build sources
00347   # one and finally the install built package one.
00348   scheduler = Scheduler(3)
00349   scheduler.serial("check-pkg", [], scheduleMore, scheduler)
00350   scheduler.run()
00351   assert(scheduler.doneJobs == ["check-pkg", "download", "build", "install", "final-job"])