CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch9/src/PKGTOOLS/scheduler.py

Go to the documentation of this file.
00001 from Queue import Queue
00002 from threading import Thread
00003 from time import sleep
00004 import threading
00005 import sys
00006 import traceback
00007 from StringIO import StringIO
00008 
00009 # Helper class to avoid conflict between result
00010 # codes and quit state transition.
00011 class _SchedulerQuitCommand(object):
00012   pass
00013 
00014 def transition(what, fromList, toList):
00015   try:
00016     fromList.remove(what)
00017   except ValueError, e:
00018     print what + " not in source list"
00019     raise e
00020   toList.append(what)
00021 
00022 class Scheduler(object):
00023   # A simple job scheduler.
00024   # Workers queue is to specify to threads what to do. Results
00025   # queue is whatched by the master thread to wait for results
00026   # of workers computations.
00027   # All worker threads begin trying to fetch from the command queue (and are
00028   # therefore blocked).
00029   # Master thread does the scheduling and then sits waiting for results.
00030   # Scheduling implies iterating on the list of jobs and creates an entry
00031   # in the parallel queue for all the jobs which does not have any dependency
00032   # which is not done.
00033   # If a job has dependencies which did not build, move it do the failed queue.
00034   # Post an appropriate build command for all the new jobs which got added.
00035   # If there are jobs still to be done, post a reschedule job on the command queue
00036   # if there are no jobs left, post the "kill worker" task.
00037   def __init__(self, parallelThreads, logDelegate=None):
00038     self.workersQueue = Queue()
00039     self.resultsQueue = Queue()
00040     self.jobs = {}
00041     self.pendingJobs = []
00042     self.runningJobs = []
00043     self.doneJobs = []
00044     self.brokenJobs = []
00045     self.parallelThreads = parallelThreads
00046     self.logDelegate = logDelegate
00047     self.errors = {}
00048     if not logDelegate:
00049       self.logDelegate = self.__doLog 
00050 
00051   def run(self):
00052     for i in xrange(self.parallelThreads):
00053       t = Thread(target=self.__createWorker())
00054       t.daemon = True
00055       t.start()
00056     
00057     self.notifyMaster(self.__rescheduleParallel)
00058     # Wait until all the workers are done.
00059     while self.parallelThreads:
00060       try:
00061         who, item = self.resultsQueue.get()
00062         #print who, item
00063         item[0](*item[1:])
00064         sleep(0.1)
00065       except KeyboardInterrupt:
00066         print "Ctrl-c received, waiting for workers to finish"
00067         while self.workersQueue.full():
00068           self.workersQueue.get(False)
00069         self.shout(self.quit)
00070     
00071     # Prune the queue.
00072     while self.resultsQueue.full():
00073       item = self.resultsQueue.get() 
00074       item[0](*item[1:])
00075     return
00076 
00077   # Create a worker.
00078   def __createWorker(self):
00079     def worker():
00080       while True:
00081         taskId, item = self.workersQueue.get()
00082         try:
00083           result = item[0](*item[1:])
00084         except Exception, e:
00085           s = StringIO()
00086           traceback.print_exc(file=s)
00087           result = s.getvalue()
00088           
00089         if type(result) == _SchedulerQuitCommand:
00090           self.notifyMaster(self.__releaseWorker)
00091           return
00092         self.log(str(item) + " done")
00093         self.notifyMaster(self.__updateJobStatus, taskId, result)
00094         self.notifyMaster(self.__rescheduleParallel)
00095         # Only in 2.5: self.workersQueue.task_done()
00096     return worker
00097   
00098   def __releaseWorker(self):
00099     self.parallelThreads -= 1
00100 
00101   def parallel(self, taskId, deps, *spec):
00102     if taskId in self.pendingJobs:
00103       self.log("Double task %s" % taskId)
00104     self.jobs[taskId] = {"scheduler": "parallel", "deps": deps, "spec":spec}
00105     self.pendingJobs.append(taskId)
00106 
00107   # Does the rescheduling of tasks. Derived class should call it.
00108   def __rescheduleParallel(self):
00109     parallelJobs = [j for j in self.pendingJobs if self.jobs[j]["scheduler"] == "parallel"]
00110     # First of all clean up the pending parallel jobs from all those
00111     # which have broken dependencies.
00112     for taskId in parallelJobs:
00113       brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs]
00114       if not brokenDeps:
00115         continue
00116       transition(taskId, self.pendingJobs, self.brokenJobs)
00117       self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
00118 
00119     # If no tasks left, quit. Notice we need to check also for serial jobs
00120     # since they might queue more parallel payloads.
00121     if not self.pendingJobs:
00122       self.shout(self.quit)
00123       self.notifyMaster(self.quit)
00124       return
00125 
00126     # Otherwise do another round of scheduling of all the tasks. In this
00127     # case we only queue parallel jobs to the parallel queue.
00128     for taskId in parallelJobs:
00129       pendingDeps = [dep for dep in self.jobs[taskId]["deps"] if not dep in self.doneJobs]
00130       if pendingDeps:
00131         continue
00132       # No broken dependencies and no pending ones. we can continue.
00133       transition(taskId, self.pendingJobs, self.runningJobs)
00134       self.__scheduleParallel(taskId, self.jobs[taskId]["spec"])
00135 
00136   # Update the job with the result of running.
00137   def __updateJobStatus(self, taskId, error):
00138     if not error:
00139       transition(taskId, self.runningJobs, self.doneJobs)
00140       return
00141     transition(taskId, self.runningJobs, self.brokenJobs)
00142     self.errors[taskId] = error
00143   
00144   # One task at the time.
00145   def __scheduleParallel(self, taskId, commandSpec):
00146     self.workersQueue.put((taskId, commandSpec))
00147 
00148   # Helper to enqueue commands for all the threads.
00149   def shout(self, *commandSpec):
00150     for x in xrange(self.parallelThreads):
00151       self.__scheduleParallel("quit-" + str(x), commandSpec)
00152 
00153   # Helper to enqueu replies to the master thread.
00154   def notifyMaster(self, *commandSpec):
00155     self.resultsQueue.put((threading.currentThread(), commandSpec))
00156 
00157   def serial(self, taskId, deps, *commandSpec):
00158     spec = [self.doSerial, taskId, deps] + list(commandSpec)
00159     self.resultsQueue.put((threading.currentThread(), spec))
00160     self.jobs[taskId] = {"scheduler": "serial", "deps": deps, "spec": spec}
00161     self.pendingJobs.append(taskId)
00162 
00163   def doSerial(self, taskId, deps, *commandSpec):
00164     brokenDeps = [dep for dep in deps if dep in self.brokenJobs]
00165     if brokenDeps:
00166       transition(taskId, self.pendingJobs, self.brokenJobs)
00167       self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
00168       # Remember to do the scheduling again!
00169       self.notifyMaster(self.__rescheduleParallel)
00170       return
00171     
00172     # Put back the task on the queue, since it has pending dependencies.
00173     pendingDeps = [dep for dep in deps if not dep in self.doneJobs]
00174     if pendingDeps:
00175       self.resultsQueue.put((threading.currentThread(), [self.doSerial, taskId, deps] + list(commandSpec)))
00176       return
00177     # No broken dependencies and no pending ones. Run the job.
00178     transition(taskId, self.pendingJobs, self.runningJobs)
00179     try:
00180       result = commandSpec[0](*commandSpec[1:])
00181     except Exception, e:
00182       s = StringIO()
00183       traceback.print_exc(file=s)
00184       result = s.getvalue()
00185     self.__updateJobStatus(taskId, result)
00186     # Remember to do the scheduling again!
00187     self.notifyMaster(self.__rescheduleParallel)
00188   
00189   # Helper method to do logging:
00190   def log(self, s):
00191     self.notifyMaster(self.logDelegate, s)
00192 
00193   # Task which forces a worker to quit.
00194   def quit(self):
00195     self.log("Requested to quit.")
00196     return _SchedulerQuitCommand()
00197 
00198   # Helper for printouts.
00199   def __doLog(self, s):
00200     print s
00201 
00202   def reschedule(self):
00203     self.notifyMaster(self.__rescheduleParallel)
00204 
00205 def dummyTask():
00206   sleep(0.1)
00207 
00208 def dummyTaskLong():
00209   sleep(1)
00210 
00211 def errorTask():
00212   return "This will always have an error"
00213 
00214 def exceptionTask():
00215   raise Exception("foo")
00216 
00217 # Mimics cmsBuild workflow.
00218 def scheduleMore(scheduler):
00219   scheduler.parallel("download", [], dummyTask)
00220   scheduler.parallel("build", ["download"], dummyTask)
00221   scheduler.serial("install", ["build"], dummyTask)
00222 
00223 if __name__ == "__main__":
00224   scheduler = Scheduler(10)
00225   scheduler.run()
00226 
00227   scheduler = Scheduler(1)
00228   scheduler.run()
00229   
00230   scheduler = Scheduler(10)
00231   scheduler.parallel("test", [], scheduler.log, "This is england");
00232   scheduler.run()
00233 
00234   scheduler = Scheduler(10)
00235   for x in xrange(50):
00236     scheduler.parallel("test" + str(x), [], dummyTask)
00237   scheduler.run()
00238   assert(len(scheduler.brokenJobs) == 0)
00239   assert(len(scheduler.jobs) == 50)
00240   
00241   scheduler = Scheduler(1)
00242   scheduler.parallel("test", [], errorTask)
00243   scheduler.run()
00244   assert(len(scheduler.brokenJobs) == 1)
00245   assert(len(scheduler.runningJobs) == 0)
00246   assert(len(scheduler.doneJobs) == 0)
00247   
00248   # Check dependency actually works.
00249   scheduler = Scheduler(10)
00250   scheduler.parallel("test2", ["test1"], dummyTask)
00251   scheduler.parallel("test1", [], dummyTaskLong) 
00252   scheduler.run()
00253   assert(scheduler.doneJobs == ["test1", "test2"])
00254 
00255   # Check dependency actually works.
00256   scheduler = Scheduler(10)
00257   scheduler.parallel("test3", ["test2"], dummyTask)
00258   scheduler.parallel("test2", ["test1"], errorTask)
00259   scheduler.parallel("test1", [], dummyTaskLong) 
00260   scheduler.run()
00261   assert(scheduler.doneJobs == ["test1"])
00262   assert(scheduler.brokenJobs == ["test2", "test3"])
00263 
00264   # Check ctrl-C will exit properly.
00265   scheduler = Scheduler(2)
00266   for x in xrange(250):
00267     scheduler.parallel("test" + str(x), [], dummyTask)
00268   print "Print Control-C to continue"
00269   scheduler.run()
00270 
00271   # Handle tasks with exceptions.
00272   scheduler = Scheduler(2)
00273   scheduler.parallel("test", [], exceptionTask)
00274   scheduler.run()
00275   assert(scheduler.errors["test"])
00276 
00277   # Handle tasks which depend on tasks with exceptions.
00278   scheduler = Scheduler(2)
00279   scheduler.parallel("test0", [], dummyTask)
00280   scheduler.parallel("test1", [], exceptionTask)
00281   scheduler.parallel("test2", ["test1"], dummyTask)
00282   scheduler.run()
00283   assert(scheduler.errors["test1"])
00284   assert(scheduler.errors["test2"])
00285 
00286   # Handle serial execution tasks.
00287   scheduler = Scheduler(2)
00288   scheduler.serial("test0", [], dummyTask)
00289   scheduler.run()
00290   assert(scheduler.doneJobs == ["test0"])
00291 
00292   # Handle serial execution tasks, one depends from
00293   # the previous one.
00294   scheduler = Scheduler(2)
00295   scheduler.serial("test0", [], dummyTask)
00296   scheduler.serial("test1", ["test0"], dummyTask)
00297   scheduler.run()
00298   assert(scheduler.doneJobs == ["test0", "test1"])
00299 
00300   # Serial tasks depending on one another.
00301   scheduler = Scheduler(2)
00302   scheduler.serial("test1", ["test0"], dummyTask)
00303   scheduler.serial("test0", [], dummyTask)
00304   scheduler.run()
00305   assert(scheduler.doneJobs == ["test0", "test1"])
00306 
00307   # Serial and parallel tasks being scheduled at the same time.
00308   scheduler = Scheduler(2)
00309   scheduler.serial("test1", ["test0"], dummyTask)
00310   scheduler.serial("test0", [], dummyTask)
00311   scheduler.parallel("test2", [], dummyTask)
00312   scheduler.parallel("test3", [], dummyTask)
00313   scheduler.run()
00314   scheduler.doneJobs.sort()
00315   assert(scheduler.doneJobs == ["test0", "test1", "test2", "test3"])
00316 
00317   # Serial and parallel tasks. Parallel depends on serial.
00318   scheduler = Scheduler(2)
00319   scheduler.serial("test1", ["test0"], dummyTask)
00320   scheduler.serial("test0", [], dummyTask)
00321   scheduler.parallel("test2", ["test1"], dummyTask)
00322   scheduler.parallel("test3", ["test2"], dummyTask)
00323   scheduler.run()
00324   assert(scheduler.doneJobs == ["test0", "test1", "test2", "test3"])
00325 
00326   # Serial task scheduling two parallel task and another dependent
00327   # serial task. This is actually what needs to be done for building 
00328   # packages. I.e.
00329   # The first serial task is responsible for checking if a package is already there,
00330   # then it queues a parallel download sources task, a subsequent build sources
00331   # one and finally the install built package one.
00332   scheduler = Scheduler(3)
00333   scheduler.serial("check-pkg", [], scheduleMore, scheduler)
00334   scheduler.run()
00335   assert(scheduler.doneJobs == ["check-pkg", "download", "build", "install"])