#!/usr/bin/python # # Written by Evan Jones # http://evanjones.ca/software/python-workqueue.html """Worker queues for Python (also called thread pools). This class queues work and distributes it over a set of slave objects. The slaves are Callable instances that get called in their own thread. I use it with XML-RPC server instances to distribute work over a cluster of machines. The enqueue() function is used to queue function calls. The dequeue() function returns the results of actually calling the function. This permits the execution of the function to happen in the background in another thread. The order that results are returned by dequeue() is not the same as the order work is placed into the queue with enqueue(). This class uses "keys" to track specific work items. When enqueuing a work item, a key is also required. The key does not affect the processing at all. It is simply returned when dequeue() returns the result of calling the function. This helps the caller to determine which function """ import threading class SerializedWorkQueue( object ): __slots__ = ( 'function', 'queue' ) """Implements the WorkQueue interface in a serialized fashion. This is very useful for debugging since there is no parallelism. All the work is performed when calling dequeue.""" def __init__( self, slaveFunction ): """Creates a new SerializedWorkQueue that will call the specified function.""" self.function = slaveFunction if hasattr( slaveFunction, '__len__' ): assert( len( slaveFunction ) == 1 ) self.function = slaveFunction[0] self.queue = [] def enqueue( self, key, *parameters ): """Adds a new work item to the queue. This does not actually execute the function.""" self.queue.append( (key, parameters) ) def dequeue( self ): """Returns a (key, result) pair from a completed work item, or None if there are no more items. This will actually cause the function to be executed.""" if len( self.queue ) == 0: return None # Take from the tail (much faster) key,parameters = self.queue[-1] del self.queue[-1] result = self.function( *parameters ) return key, result def __iter__( self ): """Returns an iterator over the (key, result) pairs returned by dequeue(). The iterator stops when there are no more results in the queue.""" result = self.dequeue() while result != None: yield result result = self.dequeue() class WorkQueue( object ): """Distributes work over a set of slaves. Each slave is a callable object that will be called from a seperate thread.""" def __init__( self, slaves ): self.lock = threading.Condition() # Server objects that are currently unused self.slaveQueue = list( slaves ) # Pending work self.callQueue = [] # Pending results self.resultsQueue = [] # Number of threads currently active self.activeCount = 0 def enqueue( self, key, *parameters ): """Add a new work item to the queue. This may result in threads being spawned.""" work = (key,parameters) self.lock.acquire() # enqueue the work item self.callQueue.append(work) if len( self.slaveQueue ): # There is an available server: spawn a thread slave = self.slaveQueue[0] del self.slaveQueue[0] # Increment the count of active threads self.activeCount += 1 thread = threading.Thread( target = self.slaveThread, args=(slave,) ) thread.start() self.lock.release() def __iter__( self ): """Returns an iterator over the (key, result) pairs returned by dequeue(). The iterator stops when there are no more results in the queue.""" result = self.dequeue() while result != None: yield result result = self.dequeue() def dequeue( self ): """Returns a completed work item from the queue, or None if there are no more items. If there are worker threads still working, this will block until results are available.""" result = None self.lock.acquire() while True: if len( self.resultsQueue ): # There is a result in the queue: take it result = self.resultsQueue[0] del self.resultsQueue[0] break elif self.activeCount == 0: # If there are no active threads, then there are no results pending: quit result = None break else: assert( self.activeCount > 0 ) # No result in the queue: wait for work self.lock.wait() self.lock.release() return result def slaveThread( self, slaveServer ): """This thread executes the method for each work item.""" #~ print "slave thread starting up" self.lock.acquire() assert( self.activeCount > 0 ) while len( self.callQueue ) > 0: # Dequeue work key,parameters = self.callQueue[0] del self.callQueue[0] self.lock.release() #~ print "making a call..." results = slaveServer( *parameters ) self.lock.acquire() self.resultsQueue.append( (key, results) ) # Notify the main thread that results are available, if it was waiting self.lock.notify() # This thread is all done: put it back in the queue #~ print "slave thread exiting" self.activeCount -= 1 assert( self.activeCount >= 0 ) self.slaveQueue.append( slaveServer ) self.lock.release() if __name__ == "__main__": import unittest input = [ 1, 2, 3, 'a', 'b', 'c' ] class verifyWorkQueue( unittest.TestCase ): def work( self, parameter ): return parameter def queueTester( self, wq ): for i, value in enumerate( input ): wq.enqueue( i, value ) output = list( input ) for key,out in wq: assert( out in output ) output.remove( out ) assert( len( output ) == 0 ) def testQueueBasic(self): self.queueTester( WorkQueue( [ self.work ] ) ) def testQueueConcurrent( self ): self.queueTester( WorkQueue( [ self.work, self.work, self.work, self.work ] ) ) def testSerialized( self ): self.queueTester( SerializedWorkQueue( self.work ) ) unittest.main()