Public Member Functions | |
def | __init__ |
def | doSerial |
def | log |
def | notifyMaster |
def | parallel |
def | quit |
def | reschedule |
def | run |
def | serial |
def | shout |
Public Attributes | |
brokenJobs | |
doneJobs | |
errors | |
finalJobDeps | |
finalJobSpec | |
jobs | |
logDelegate | |
parallelThreads | |
pendingJobs | |
resultsQueue | |
runningJobs | |
workersQueue | |
Private Member Functions | |
def | __createWorker |
def | __doLog |
def | __releaseWorker |
def | __rescheduleParallel |
def | __scheduleParallel |
def | __updateJobStatus |
Definition at line 22 of file scheduler.py.
def scheduler::Scheduler::__init__ | ( | self, | |
parallelThreads, | |||
logDelegate = None |
|||
) |
Definition at line 40 of file scheduler.py.
00041 : 00042 self.workersQueue = Queue() 00043 self.resultsQueue = Queue() 00044 self.jobs = {} 00045 self.pendingJobs = [] 00046 self.runningJobs = [] 00047 self.doneJobs = [] 00048 self.brokenJobs = [] 00049 self.parallelThreads = parallelThreads 00050 self.logDelegate = logDelegate 00051 self.errors = {} 00052 if not logDelegate: 00053 self.logDelegate = self.__doLog 00054 # Add a final job, which will depend on any spawned task so that we do not 00055 # terminate until we are completely done. 00056 self.finalJobDeps = [] 00057 self.finalJobSpec = [self.doSerial, "final-job", self.finalJobDeps] + [self.__doLog, "Nothing else to be done, exiting."] 00058 self.resultsQueue.put((threading.currentThread(), self.finalJobSpec)) 00059 self.jobs["final-job"] = {"scheduler": "serial", "deps": self.finalJobSpec, "spec": self.finalJobSpec} 00060 self.pendingJobs.append("final-job")
def scheduler::Scheduler::__createWorker | ( | self | ) | [private] |
Definition at line 88 of file scheduler.py.
00089 : 00090 def worker(): 00091 while True: 00092 taskId, item = self.workersQueue.get() 00093 try: 00094 result = item[0](*item[1:]) 00095 except Exception, e: 00096 s = StringIO() 00097 traceback.print_exc(file=s) 00098 result = s.getvalue() 00099 00100 if type(result) == _SchedulerQuitCommand: 00101 self.notifyMaster(self.__releaseWorker) 00102 return 00103 self.log(str(item) + " done") 00104 self.notifyMaster(self.__updateJobStatus, taskId, result) 00105 self.notifyMaster(self.__rescheduleParallel) 00106 # Only in 2.5: self.workersQueue.task_done() 00107 return worker
def scheduler::Scheduler::__doLog | ( | self, | |
s | |||
) | [private] |
Definition at line 211 of file scheduler.py.
def scheduler::Scheduler::__releaseWorker | ( | self | ) | [private] |
Definition at line 108 of file scheduler.py.
def scheduler::Scheduler::__rescheduleParallel | ( | self | ) | [private] |
Definition at line 119 of file scheduler.py.
00120 : 00121 parallelJobs = [j for j in self.pendingJobs if self.jobs[j]["scheduler"] == "parallel"] 00122 # First of all clean up the pending parallel jobs from all those 00123 # which have broken dependencies. 00124 for taskId in parallelJobs: 00125 brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs] 00126 if not brokenDeps: 00127 continue 00128 transition(taskId, self.pendingJobs, self.brokenJobs) 00129 self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps) 00130 00131 # If no tasks left, quit. Notice we need to check also for serial jobs 00132 # since they might queue more parallel payloads. 00133 if not self.pendingJobs: 00134 self.shout(self.quit) 00135 self.notifyMaster(self.quit) 00136 return 00137 00138 # Otherwise do another round of scheduling of all the tasks. In this 00139 # case we only queue parallel jobs to the parallel queue. 00140 for taskId in parallelJobs: 00141 pendingDeps = [dep for dep in self.jobs[taskId]["deps"] if not dep in self.doneJobs] 00142 if pendingDeps: 00143 continue 00144 # No broken dependencies and no pending ones. we can continue. 00145 transition(taskId, self.pendingJobs, self.runningJobs) 00146 self.__scheduleParallel(taskId, self.jobs[taskId]["spec"])
def scheduler::Scheduler::__scheduleParallel | ( | self, | |
taskId, | |||
commandSpec | |||
) | [private] |
Definition at line 156 of file scheduler.py.
def scheduler::Scheduler::__updateJobStatus | ( | self, | |
taskId, | |||
error | |||
) | [private] |
Definition at line 148 of file scheduler.py.
def scheduler::Scheduler::doSerial | ( | self, | |
taskId, | |||
deps, | |||
commandSpec | |||
) |
Definition at line 175 of file scheduler.py.
00176 : 00177 brokenDeps = [dep for dep in deps if dep in self.brokenJobs] 00178 if brokenDeps: 00179 transition(taskId, self.pendingJobs, self.brokenJobs) 00180 self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps) 00181 # Remember to do the scheduling again! 00182 self.notifyMaster(self.__rescheduleParallel) 00183 return 00184 00185 # Put back the task on the queue, since it has pending dependencies. 00186 pendingDeps = [dep for dep in deps if not dep in self.doneJobs] 00187 if pendingDeps: 00188 self.resultsQueue.put((threading.currentThread(), [self.doSerial, taskId, deps] + list(commandSpec))) 00189 return 00190 # No broken dependencies and no pending ones. Run the job. 00191 transition(taskId, self.pendingJobs, self.runningJobs) 00192 try: 00193 result = commandSpec[0](*commandSpec[1:]) 00194 except Exception, e: 00195 s = StringIO() 00196 traceback.print_exc(file=s) 00197 result = s.getvalue() 00198 self.__updateJobStatus(taskId, result) 00199 # Remember to do the scheduling again! 00200 self.notifyMaster(self.__rescheduleParallel)
def scheduler::Scheduler::log | ( | self, | |
s | |||
) |
Definition at line 202 of file scheduler.py.
def scheduler::Scheduler::notifyMaster | ( | self, | |
commandSpec | |||
) |
Definition at line 165 of file scheduler.py.
def scheduler::Scheduler::parallel | ( | self, | |
taskId, | |||
deps, | |||
spec | |||
) |
Definition at line 111 of file scheduler.py.
def scheduler::Scheduler::quit | ( | self | ) |
Definition at line 206 of file scheduler.py.
def scheduler::Scheduler::reschedule | ( | self | ) |
Definition at line 214 of file scheduler.py.
def scheduler::Scheduler::run | ( | self | ) |
Definition at line 61 of file scheduler.py.
00062 : 00063 for i in xrange(self.parallelThreads): 00064 t = Thread(target=self.__createWorker()) 00065 t.daemon = True 00066 t.start() 00067 00068 self.notifyMaster(self.__rescheduleParallel) 00069 # Wait until all the workers are done. 00070 while self.parallelThreads: 00071 try: 00072 who, item = self.resultsQueue.get() 00073 #print who, item 00074 item[0](*item[1:]) 00075 sleep(0.1) 00076 except KeyboardInterrupt: 00077 print "Ctrl-c received, waiting for workers to finish" 00078 while self.workersQueue.full(): 00079 self.workersQueue.get(False) 00080 self.shout(self.quit) 00081 00082 # Prune the queue. 00083 while self.resultsQueue.full(): 00084 item = self.resultsQueue.get() 00085 item[0](*item[1:]) 00086 return
def scheduler::Scheduler::serial | ( | self, | |
taskId, | |||
deps, | |||
commandSpec | |||
) |
Definition at line 168 of file scheduler.py.
def scheduler::Scheduler::shout | ( | self, | |
commandSpec | |||
) |
Definition at line 160 of file scheduler.py.
Definition at line 40 of file scheduler.py.
Definition at line 40 of file scheduler.py.
Definition at line 40 of file scheduler.py.
Definition at line 40 of file scheduler.py.
Definition at line 40 of file scheduler.py.
Definition at line 40 of file scheduler.py.
Definition at line 40 of file scheduler.py.
Definition at line 40 of file scheduler.py.
Definition at line 40 of file scheduler.py.
Definition at line 40 of file scheduler.py.
Definition at line 40 of file scheduler.py.
Definition at line 40 of file scheduler.py.