Public Member Functions | |
def | __init__ |
def | doSerial |
def | log |
def | notifyMaster |
def | parallel |
def | quit |
def | reschedule |
def | run |
def | serial |
def | shout |
Public Attributes | |
brokenJobs | |
doneJobs | |
errors | |
jobs | |
logDelegate | |
parallelThreads | |
pendingJobs | |
resultsQueue | |
runningJobs | |
workersQueue | |
Private Member Functions | |
def | __createWorker |
def | __doLog |
def | __releaseWorker |
def | __rescheduleParallel |
def | __scheduleParallel |
def | __updateJobStatus |
Definition at line 22 of file scheduler.py.
def scheduler::Scheduler::__init__ | ( | self, | |
parallelThreads, | |||
logDelegate = None |
|||
) |
Definition at line 37 of file scheduler.py.
00038 : 00039 self.workersQueue = Queue() 00040 self.resultsQueue = Queue() 00041 self.jobs = {} 00042 self.pendingJobs = [] 00043 self.runningJobs = [] 00044 self.doneJobs = [] 00045 self.brokenJobs = [] 00046 self.parallelThreads = parallelThreads 00047 self.logDelegate = logDelegate 00048 self.errors = {} 00049 if not logDelegate: 00050 self.logDelegate = self.__doLog
def scheduler::Scheduler::__createWorker | ( | self | ) | [private] |
Definition at line 78 of file scheduler.py.
00079 : 00080 def worker(): 00081 while True: 00082 taskId, item = self.workersQueue.get() 00083 try: 00084 result = item[0](*item[1:]) 00085 except Exception, e: 00086 s = StringIO() 00087 traceback.print_exc(file=s) 00088 result = s.getvalue() 00089 00090 if type(result) == _SchedulerQuitCommand: 00091 self.notifyMaster(self.__releaseWorker) 00092 return 00093 self.log(str(item) + " done") 00094 self.notifyMaster(self.__updateJobStatus, taskId, result) 00095 self.notifyMaster(self.__rescheduleParallel) 00096 # Only in 2.5: self.workersQueue.task_done() 00097 return worker
def scheduler::Scheduler::__doLog | ( | self, | |
s | |||
) | [private] |
Definition at line 199 of file scheduler.py.
def scheduler::Scheduler::__releaseWorker | ( | self | ) | [private] |
Definition at line 98 of file scheduler.py.
def scheduler::Scheduler::__rescheduleParallel | ( | self | ) | [private] |
Definition at line 108 of file scheduler.py.
00109 : 00110 parallelJobs = [j for j in self.pendingJobs if self.jobs[j]["scheduler"] == "parallel"] 00111 # First of all clean up the pending parallel jobs from all those 00112 # which have broken dependencies. 00113 for taskId in parallelJobs: 00114 brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs] 00115 if not brokenDeps: 00116 continue 00117 transition(taskId, self.pendingJobs, self.brokenJobs) 00118 self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps) 00119 00120 # If no tasks left, quit. Notice we need to check also for serial jobs 00121 # since they might queue more parallel payloads. 00122 if not self.pendingJobs: 00123 self.shout(self.quit) 00124 self.notifyMaster(self.quit) 00125 return 00126 00127 # Otherwise do another round of scheduling of all the tasks. In this 00128 # case we only queue parallel jobs to the parallel queue. 00129 for taskId in parallelJobs: 00130 pendingDeps = [dep for dep in self.jobs[taskId]["deps"] if not dep in self.doneJobs] 00131 if pendingDeps: 00132 continue 00133 # No broken dependencies and no pending ones. we can continue. 00134 transition(taskId, self.pendingJobs, self.runningJobs) 00135 self.__scheduleParallel(taskId, self.jobs[taskId]["spec"])
def scheduler::Scheduler::__scheduleParallel | ( | self, | |
taskId, | |||
commandSpec | |||
) | [private] |
Definition at line 145 of file scheduler.py.
def scheduler::Scheduler::__updateJobStatus | ( | self, | |
taskId, | |||
error | |||
) | [private] |
Definition at line 137 of file scheduler.py.
def scheduler::Scheduler::doSerial | ( | self, | |
taskId, | |||
deps, | |||
commandSpec | |||
) |
Definition at line 163 of file scheduler.py.
00164 : 00165 brokenDeps = [dep for dep in deps if dep in self.brokenJobs] 00166 if brokenDeps: 00167 transition(taskId, self.pendingJobs, self.brokenJobs) 00168 self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps) 00169 # Remember to do the scheduling again! 00170 self.notifyMaster(self.__rescheduleParallel) 00171 return 00172 00173 # Put back the task on the queue, since it has pending dependencies. 00174 pendingDeps = [dep for dep in deps if not dep in self.doneJobs] 00175 if pendingDeps: 00176 self.resultsQueue.put((threading.currentThread(), [self.doSerial, taskId, deps] + list(commandSpec))) 00177 return 00178 # No broken dependencies and no pending ones. Run the job. 00179 transition(taskId, self.pendingJobs, self.runningJobs) 00180 try: 00181 result = commandSpec[0](*commandSpec[1:]) 00182 except Exception, e: 00183 s = StringIO() 00184 traceback.print_exc(file=s) 00185 result = s.getvalue() 00186 self.__updateJobStatus(taskId, result) 00187 # Remember to do the scheduling again! 00188 self.notifyMaster(self.__rescheduleParallel)
def scheduler::Scheduler::log | ( | self, | |
s | |||
) |
Definition at line 190 of file scheduler.py.
def scheduler::Scheduler::notifyMaster | ( | self, | |
commandSpec | |||
) |
Definition at line 154 of file scheduler.py.
def scheduler::Scheduler::parallel | ( | self, | |
taskId, | |||
deps, | |||
spec | |||
) |
Definition at line 101 of file scheduler.py.
def scheduler::Scheduler::quit | ( | self | ) |
Definition at line 194 of file scheduler.py.
def scheduler::Scheduler::reschedule | ( | self | ) |
Definition at line 202 of file scheduler.py.
def scheduler::Scheduler::run | ( | self | ) |
Definition at line 51 of file scheduler.py.
00052 : 00053 for i in xrange(self.parallelThreads): 00054 t = Thread(target=self.__createWorker()) 00055 t.daemon = True 00056 t.start() 00057 00058 self.notifyMaster(self.__rescheduleParallel) 00059 # Wait until all the workers are done. 00060 while self.parallelThreads: 00061 try: 00062 who, item = self.resultsQueue.get() 00063 #print who, item 00064 item[0](*item[1:]) 00065 sleep(0.1) 00066 except KeyboardInterrupt: 00067 print "Ctrl-c received, waiting for workers to finish" 00068 while self.workersQueue.full(): 00069 self.workersQueue.get(False) 00070 self.shout(self.quit) 00071 00072 # Prune the queue. 00073 while self.resultsQueue.full(): 00074 item = self.resultsQueue.get() 00075 item[0](*item[1:]) 00076 return
def scheduler::Scheduler::serial | ( | self, | |
taskId, | |||
deps, | |||
commandSpec | |||
) |
Definition at line 157 of file scheduler.py.
def scheduler::Scheduler::shout | ( | self, | |
commandSpec | |||
) |
Definition at line 149 of file scheduler.py.
Definition at line 37 of file scheduler.py.
Definition at line 37 of file scheduler.py.
Definition at line 37 of file scheduler.py.
Definition at line 37 of file scheduler.py.
Definition at line 37 of file scheduler.py.
Definition at line 37 of file scheduler.py.
Definition at line 37 of file scheduler.py.
Definition at line 37 of file scheduler.py.
Definition at line 37 of file scheduler.py.
Definition at line 37 of file scheduler.py.