CMS 3D CMS Logo

Public Member Functions | Public Attributes | Private Member Functions

scheduler::Scheduler Class Reference

List of all members.

Public Member Functions

def __init__
def doSerial
def log
def notifyMaster
def parallel
def quit
def reschedule
def run
def serial
def shout

Public Attributes

 brokenJobs
 doneJobs
 errors
 finalJobDeps
 finalJobSpec
 jobs
 logDelegate
 parallelThreads
 pendingJobs
 resultsQueue
 runningJobs
 workersQueue

Private Member Functions

def __createWorker
def __doLog
def __releaseWorker
def __rescheduleParallel
def __scheduleParallel
def __updateJobStatus

Detailed Description

Definition at line 22 of file scheduler.py.


Constructor & Destructor Documentation

def scheduler::Scheduler::__init__ (   self,
  parallelThreads,
  logDelegate = None 
)

Definition at line 40 of file scheduler.py.

00041                                                        :
00042     self.workersQueue = Queue()
00043     self.resultsQueue = Queue()
00044     self.jobs = {}
00045     self.pendingJobs = []
00046     self.runningJobs = []
00047     self.doneJobs = []
00048     self.brokenJobs = []
00049     self.parallelThreads = parallelThreads
00050     self.logDelegate = logDelegate
00051     self.errors = {}
00052     if not logDelegate:
00053       self.logDelegate = self.__doLog 
00054     # Add a final job, which will depend on any spawned task so that we do not
00055     # terminate until we are completely done.
00056     self.finalJobDeps = []
00057     self.finalJobSpec = [self.doSerial, "final-job", self.finalJobDeps] + [self.__doLog, "Nothing else to be done, exiting."]
00058     self.resultsQueue.put((threading.currentThread(), self.finalJobSpec))
00059     self.jobs["final-job"] = {"scheduler": "serial", "deps": self.finalJobSpec, "spec": self.finalJobSpec}
00060     self.pendingJobs.append("final-job")


Member Function Documentation

def scheduler::Scheduler::__createWorker (   self) [private]

Definition at line 88 of file scheduler.py.

00089                           :
00090     def worker():
00091       while True:
00092         taskId, item = self.workersQueue.get()
00093         try:
00094           result = item[0](*item[1:])
00095         except Exception, e:
00096           s = StringIO()
00097           traceback.print_exc(file=s)
00098           result = s.getvalue()
00099           
00100         if type(result) == _SchedulerQuitCommand:
00101           self.notifyMaster(self.__releaseWorker)
00102           return
00103         self.log(str(item) + " done")
00104         self.notifyMaster(self.__updateJobStatus, taskId, result)
00105         self.notifyMaster(self.__rescheduleParallel)
00106         # Only in 2.5: self.workersQueue.task_done()
00107     return worker
  
def scheduler::Scheduler::__doLog (   self,
  s 
) [private]

Definition at line 211 of file scheduler.py.

00212                       :
00213     print s

def scheduler::Scheduler::__releaseWorker (   self) [private]

Definition at line 108 of file scheduler.py.

00109                            :
00110     self.parallelThreads -= 1

def scheduler::Scheduler::__rescheduleParallel (   self) [private]

Definition at line 119 of file scheduler.py.

00120                                 :
00121     parallelJobs = [j for j in self.pendingJobs if self.jobs[j]["scheduler"] == "parallel"]
00122     # First of all clean up the pending parallel jobs from all those
00123     # which have broken dependencies.
00124     for taskId in parallelJobs:
00125       brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs]
00126       if not brokenDeps:
00127         continue
00128       transition(taskId, self.pendingJobs, self.brokenJobs)
00129       self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
00130 
00131     # If no tasks left, quit. Notice we need to check also for serial jobs
00132     # since they might queue more parallel payloads.
00133     if not self.pendingJobs:
00134       self.shout(self.quit)
00135       self.notifyMaster(self.quit)
00136       return
00137 
00138     # Otherwise do another round of scheduling of all the tasks. In this
00139     # case we only queue parallel jobs to the parallel queue.
00140     for taskId in parallelJobs:
00141       pendingDeps = [dep for dep in self.jobs[taskId]["deps"] if not dep in self.doneJobs]
00142       if pendingDeps:
00143         continue
00144       # No broken dependencies and no pending ones. we can continue.
00145       transition(taskId, self.pendingJobs, self.runningJobs)
00146       self.__scheduleParallel(taskId, self.jobs[taskId]["spec"])

def scheduler::Scheduler::__scheduleParallel (   self,
  taskId,
  commandSpec 
) [private]

Definition at line 156 of file scheduler.py.

00157                                                    :
00158     self.workersQueue.put((taskId, commandSpec))

def scheduler::Scheduler::__updateJobStatus (   self,
  taskId,
  error 
) [private]

Definition at line 148 of file scheduler.py.

00149                                             :
00150     if not error:
00151       transition(taskId, self.runningJobs, self.doneJobs)
00152       return
00153     transition(taskId, self.runningJobs, self.brokenJobs)
00154     self.errors[taskId] = error
  
def scheduler::Scheduler::doSerial (   self,
  taskId,
  deps,
  commandSpec 
)

Definition at line 175 of file scheduler.py.

00176                                                 :
00177     brokenDeps = [dep for dep in deps if dep in self.brokenJobs]
00178     if brokenDeps:
00179       transition(taskId, self.pendingJobs, self.brokenJobs)
00180       self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
00181       # Remember to do the scheduling again!
00182       self.notifyMaster(self.__rescheduleParallel)
00183       return
00184     
00185     # Put back the task on the queue, since it has pending dependencies.
00186     pendingDeps = [dep for dep in deps if not dep in self.doneJobs]
00187     if pendingDeps:
00188       self.resultsQueue.put((threading.currentThread(), [self.doSerial, taskId, deps] + list(commandSpec)))
00189       return
00190     # No broken dependencies and no pending ones. Run the job.
00191     transition(taskId, self.pendingJobs, self.runningJobs)
00192     try:
00193       result = commandSpec[0](*commandSpec[1:])
00194     except Exception, e:
00195       s = StringIO()
00196       traceback.print_exc(file=s)
00197       result = s.getvalue()
00198     self.__updateJobStatus(taskId, result)
00199     # Remember to do the scheduling again!
00200     self.notifyMaster(self.__rescheduleParallel)
  
def scheduler::Scheduler::log (   self,
  s 
)

Definition at line 202 of file scheduler.py.

00203                   :
00204     self.notifyMaster(self.logDelegate, s)

def scheduler::Scheduler::notifyMaster (   self,
  commandSpec 
)

Definition at line 165 of file scheduler.py.

00166                                       :
00167     self.resultsQueue.put((threading.currentThread(), commandSpec))

def scheduler::Scheduler::parallel (   self,
  taskId,
  deps,
  spec 
)

Definition at line 111 of file scheduler.py.

00112                                          :
00113     if taskId in self.pendingJobs:
00114       self.log("Double task %s" % taskId)
00115     self.jobs[taskId] = {"scheduler": "parallel", "deps": deps, "spec":spec}
00116     self.pendingJobs.append(taskId)
00117     self.finalJobDeps.append(taskId)

def scheduler::Scheduler::quit (   self)

Definition at line 206 of file scheduler.py.

00207                 :
00208     self.log("Requested to quit.")
00209     return _SchedulerQuitCommand()

def scheduler::Scheduler::reschedule (   self)

Definition at line 214 of file scheduler.py.

00215                       :
00216     self.notifyMaster(self.__rescheduleParallel)

def scheduler::Scheduler::run (   self)

Definition at line 61 of file scheduler.py.

00062                :
00063     for i in xrange(self.parallelThreads):
00064       t = Thread(target=self.__createWorker())
00065       t.daemon = True
00066       t.start()
00067     
00068     self.notifyMaster(self.__rescheduleParallel)
00069     # Wait until all the workers are done.
00070     while self.parallelThreads:
00071       try:
00072         who, item = self.resultsQueue.get()
00073         #print who, item
00074         item[0](*item[1:])
00075         sleep(0.1)
00076       except KeyboardInterrupt:
00077         print "Ctrl-c received, waiting for workers to finish"
00078         while self.workersQueue.full():
00079           self.workersQueue.get(False)
00080         self.shout(self.quit)
00081     
00082     # Prune the queue.
00083     while self.resultsQueue.full():
00084       item = self.resultsQueue.get() 
00085       item[0](*item[1:])
00086     return

def scheduler::Scheduler::serial (   self,
  taskId,
  deps,
  commandSpec 
)

Definition at line 168 of file scheduler.py.

00169                                               :
00170     spec = [self.doSerial, taskId, deps] + list(commandSpec)
00171     self.resultsQueue.put((threading.currentThread(), spec))
00172     self.jobs[taskId] = {"scheduler": "serial", "deps": deps, "spec": spec}
00173     self.pendingJobs.append(taskId)
00174     self.finalJobDeps.append(taskId)

def scheduler::Scheduler::shout (   self,
  commandSpec 
)

Definition at line 160 of file scheduler.py.

00161                                :
00162     for x in xrange(self.parallelThreads):
00163       self.__scheduleParallel("quit-" + str(x), commandSpec)


Member Data Documentation

Definition at line 40 of file scheduler.py.

Definition at line 40 of file scheduler.py.

Definition at line 40 of file scheduler.py.

Definition at line 40 of file scheduler.py.

Definition at line 40 of file scheduler.py.

Definition at line 40 of file scheduler.py.

Definition at line 40 of file scheduler.py.

Definition at line 40 of file scheduler.py.

Definition at line 40 of file scheduler.py.

Definition at line 40 of file scheduler.py.

Definition at line 40 of file scheduler.py.

Definition at line 40 of file scheduler.py.