CMS 3D CMS Logo

Public Member Functions | Public Attributes | Private Member Functions

scheduler::Scheduler Class Reference

List of all members.

Public Member Functions

def __init__
def doSerial
def log
def notifyMaster
def parallel
def quit
def reschedule
def run
def serial
def shout

Public Attributes

 brokenJobs
 doneJobs
 errors
 jobs
 logDelegate
 parallelThreads
 pendingJobs
 resultsQueue
 runningJobs
 workersQueue

Private Member Functions

def __createWorker
def __doLog
def __releaseWorker
def __rescheduleParallel
def __scheduleParallel
def __updateJobStatus

Detailed Description

Definition at line 22 of file scheduler.py.


Constructor & Destructor Documentation

def scheduler::Scheduler::__init__ (   self,
  parallelThreads,
  logDelegate = None 
)

Definition at line 37 of file scheduler.py.

00038                                                        :
00039     self.workersQueue = Queue()
00040     self.resultsQueue = Queue()
00041     self.jobs = {}
00042     self.pendingJobs = []
00043     self.runningJobs = []
00044     self.doneJobs = []
00045     self.brokenJobs = []
00046     self.parallelThreads = parallelThreads
00047     self.logDelegate = logDelegate
00048     self.errors = {}
00049     if not logDelegate:
00050       self.logDelegate = self.__doLog 


Member Function Documentation

def scheduler::Scheduler::__createWorker (   self) [private]

Definition at line 78 of file scheduler.py.

00079                           :
00080     def worker():
00081       while True:
00082         taskId, item = self.workersQueue.get()
00083         try:
00084           result = item[0](*item[1:])
00085         except Exception, e:
00086           s = StringIO()
00087           traceback.print_exc(file=s)
00088           result = s.getvalue()
00089           
00090         if type(result) == _SchedulerQuitCommand:
00091           self.notifyMaster(self.__releaseWorker)
00092           return
00093         self.log(str(item) + " done")
00094         self.notifyMaster(self.__updateJobStatus, taskId, result)
00095         self.notifyMaster(self.__rescheduleParallel)
00096         # Only in 2.5: self.workersQueue.task_done()
00097     return worker
  
def scheduler::Scheduler::__doLog (   self,
  s 
) [private]

Definition at line 199 of file scheduler.py.

00200                       :
00201     print s

def scheduler::Scheduler::__releaseWorker (   self) [private]

Definition at line 98 of file scheduler.py.

00099                            :
00100     self.parallelThreads -= 1

def scheduler::Scheduler::__rescheduleParallel (   self) [private]

Definition at line 108 of file scheduler.py.

00109                                 :
00110     parallelJobs = [j for j in self.pendingJobs if self.jobs[j]["scheduler"] == "parallel"]
00111     # First of all clean up the pending parallel jobs from all those
00112     # which have broken dependencies.
00113     for taskId in parallelJobs:
00114       brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs]
00115       if not brokenDeps:
00116         continue
00117       transition(taskId, self.pendingJobs, self.brokenJobs)
00118       self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
00119 
00120     # If no tasks left, quit. Notice we need to check also for serial jobs
00121     # since they might queue more parallel payloads.
00122     if not self.pendingJobs:
00123       self.shout(self.quit)
00124       self.notifyMaster(self.quit)
00125       return
00126 
00127     # Otherwise do another round of scheduling of all the tasks. In this
00128     # case we only queue parallel jobs to the parallel queue.
00129     for taskId in parallelJobs:
00130       pendingDeps = [dep for dep in self.jobs[taskId]["deps"] if not dep in self.doneJobs]
00131       if pendingDeps:
00132         continue
00133       # No broken dependencies and no pending ones. we can continue.
00134       transition(taskId, self.pendingJobs, self.runningJobs)
00135       self.__scheduleParallel(taskId, self.jobs[taskId]["spec"])

def scheduler::Scheduler::__scheduleParallel (   self,
  taskId,
  commandSpec 
) [private]

Definition at line 145 of file scheduler.py.

00146                                                    :
00147     self.workersQueue.put((taskId, commandSpec))

def scheduler::Scheduler::__updateJobStatus (   self,
  taskId,
  error 
) [private]

Definition at line 137 of file scheduler.py.

00138                                             :
00139     if not error:
00140       transition(taskId, self.runningJobs, self.doneJobs)
00141       return
00142     transition(taskId, self.runningJobs, self.brokenJobs)
00143     self.errors[taskId] = error
  
def scheduler::Scheduler::doSerial (   self,
  taskId,
  deps,
  commandSpec 
)

Definition at line 163 of file scheduler.py.

00164                                                 :
00165     brokenDeps = [dep for dep in deps if dep in self.brokenJobs]
00166     if brokenDeps:
00167       transition(taskId, self.pendingJobs, self.brokenJobs)
00168       self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
00169       # Remember to do the scheduling again!
00170       self.notifyMaster(self.__rescheduleParallel)
00171       return
00172     
00173     # Put back the task on the queue, since it has pending dependencies.
00174     pendingDeps = [dep for dep in deps if not dep in self.doneJobs]
00175     if pendingDeps:
00176       self.resultsQueue.put((threading.currentThread(), [self.doSerial, taskId, deps] + list(commandSpec)))
00177       return
00178     # No broken dependencies and no pending ones. Run the job.
00179     transition(taskId, self.pendingJobs, self.runningJobs)
00180     try:
00181       result = commandSpec[0](*commandSpec[1:])
00182     except Exception, e:
00183       s = StringIO()
00184       traceback.print_exc(file=s)
00185       result = s.getvalue()
00186     self.__updateJobStatus(taskId, result)
00187     # Remember to do the scheduling again!
00188     self.notifyMaster(self.__rescheduleParallel)
  
def scheduler::Scheduler::log (   self,
  s 
)

Definition at line 190 of file scheduler.py.

00191                   :
00192     self.notifyMaster(self.logDelegate, s)

def scheduler::Scheduler::notifyMaster (   self,
  commandSpec 
)

Definition at line 154 of file scheduler.py.

00155                                       :
00156     self.resultsQueue.put((threading.currentThread(), commandSpec))

def scheduler::Scheduler::parallel (   self,
  taskId,
  deps,
  spec 
)

Definition at line 101 of file scheduler.py.

00102                                          :
00103     if taskId in self.pendingJobs:
00104       self.log("Double task %s" % taskId)
00105     self.jobs[taskId] = {"scheduler": "parallel", "deps": deps, "spec":spec}
00106     self.pendingJobs.append(taskId)

def scheduler::Scheduler::quit (   self)

Definition at line 194 of file scheduler.py.

00195                 :
00196     self.log("Requested to quit.")
00197     return _SchedulerQuitCommand()

def scheduler::Scheduler::reschedule (   self)

Definition at line 202 of file scheduler.py.

00203                       :
00204     self.notifyMaster(self.__rescheduleParallel)

def scheduler::Scheduler::run (   self)

Definition at line 51 of file scheduler.py.

00052                :
00053     for i in xrange(self.parallelThreads):
00054       t = Thread(target=self.__createWorker())
00055       t.daemon = True
00056       t.start()
00057     
00058     self.notifyMaster(self.__rescheduleParallel)
00059     # Wait until all the workers are done.
00060     while self.parallelThreads:
00061       try:
00062         who, item = self.resultsQueue.get()
00063         #print who, item
00064         item[0](*item[1:])
00065         sleep(0.1)
00066       except KeyboardInterrupt:
00067         print "Ctrl-c received, waiting for workers to finish"
00068         while self.workersQueue.full():
00069           self.workersQueue.get(False)
00070         self.shout(self.quit)
00071     
00072     # Prune the queue.
00073     while self.resultsQueue.full():
00074       item = self.resultsQueue.get() 
00075       item[0](*item[1:])
00076     return

def scheduler::Scheduler::serial (   self,
  taskId,
  deps,
  commandSpec 
)

Definition at line 157 of file scheduler.py.

00158                                               :
00159     spec = [self.doSerial, taskId, deps] + list(commandSpec)
00160     self.resultsQueue.put((threading.currentThread(), spec))
00161     self.jobs[taskId] = {"scheduler": "serial", "deps": deps, "spec": spec}
00162     self.pendingJobs.append(taskId)

def scheduler::Scheduler::shout (   self,
  commandSpec 
)

Definition at line 149 of file scheduler.py.

00150                                :
00151     for x in xrange(self.parallelThreads):
00152       self.__scheduleParallel("quit-" + str(x), commandSpec)


Member Data Documentation

Definition at line 37 of file scheduler.py.

Definition at line 37 of file scheduler.py.

Definition at line 37 of file scheduler.py.

Definition at line 37 of file scheduler.py.

Definition at line 37 of file scheduler.py.

Definition at line 37 of file scheduler.py.

Definition at line 37 of file scheduler.py.

Definition at line 37 of file scheduler.py.

Definition at line 37 of file scheduler.py.

Definition at line 37 of file scheduler.py.