Queue – A thread-safe FIFO implementation

Purpose:Provides a thread-safe FIFO implementation
Available In:at least 1.4

The Queue module provides a FIFO implementation suitable for multi-threaded programming. It can be used to pass messages or other data between producer and consumer threads safely. Locking is handled for the caller, so it is simple to have as many threads as you want working with the same Queue instance. A Queue’s size (number of elements) may be restricted to throttle memory usage or processing.

Note

This discussion assumes you already understand the general nature of a queue. If you don’t, you may want to read some of the references before continuing.

Basic FIFO Queue

The Queue class implements a basic first-in, first-out container. Elements are added to one “end” of the sequence using put(), and removed from the other end using get().

import Queue

q = Queue.Queue()

for i in range(5):
    q.put(i)

while not q.empty():
    print q.get()

This example uses a single thread to illustrate that elements are removed from the queue in the same order they are inserted.

$ python Queue_fifo.py

0
1
2
3
4

LIFO Queue

In contrast to the standard FIFO implementation of Queue, the LifoQueue uses last-in, first-out ordering (normally associated with a stack data structure).

import Queue

q = Queue.LifoQueue()

for i in range(5):
    q.put(i)

while not q.empty():
    print q.get()

The item most recently put() into the queue is removed by get().

$ python Queue_lifo.py

4
3
2
1
0

Priority Queue

Sometimes the processing order of the items in a queue needs to be based on characteristics of those items, rather than just the order they are created or added to the queue. For example, print jobs from the payroll department may take precedence over a code listing printed by a developer. PriorityQueue uses the sort order of the contents of the queue to decide which to retrieve.

import Queue

class Job(object):
    def __init__(self, priority, description):
        self.priority = priority
        self.description = description
        print 'New job:', description
        return
    def __cmp__(self, other):
        return cmp(self.priority, other.priority)

q = Queue.PriorityQueue()

q.put( Job(3, 'Mid-level job') )
q.put( Job(10, 'Low-level job') )
q.put( Job(1, 'Important job') )

while not q.empty():
    next_job = q.get()
    print 'Processing job:', next_job.description
    

In this single-threaded example, the jobs are pulled out of the queue in strictly priority order. If there were multiple threads consuming the jobs, they would be processed based on the priority of items in the queue at the time get() was called.

$ python Queue_priority.py

New job: Mid-level job
New job: Low-level job
New job: Important job
Processing job: Important job
Processing job: Mid-level job
Processing job: Low-level job

Using Queues with Threads

As an example of how to use the Queue class with multiple threads, we can create a very simplistic podcasting client. This client reads one or more RSS feeds, queues up the enclosures for download, and processes several downloads in parallel using threads. It is simplistic and unsuitable for actual use, but the skeleton implementation gives us enough code to work with to provide an example of using the Queue module.

# System modules
from Queue import Queue
from threading import Thread
import time

# Local modules
import feedparser

# Set up some global variables
num_fetch_threads = 2
enclosure_queue = Queue()

# A real app wouldn't use hard-coded data...
feed_urls = [ 'http://www.castsampler.com/cast/feed/rss/guest',
             ]


def downloadEnclosures(i, q):
    """This is the worker thread function.
    It processes items in the queue one after
    another.  These daemon threads go into an
    infinite loop, and only exit when
    the main thread ends.
    """
    while True:
        print '%s: Looking for the next enclosure' % i
        url = q.get()
        print '%s: Downloading:' % i, url
        # instead of really downloading the URL,
        # we just pretend and sleep
        time.sleep(i + 2)
        q.task_done()


# Set up some threads to fetch the enclosures
for i in range(num_fetch_threads):
    worker = Thread(target=downloadEnclosures, args=(i, enclosure_queue,))
    worker.setDaemon(True)
    worker.start()

# Download the feed(s) and put the enclosure URLs into
# the queue.
for url in feed_urls:
    response = feedparser.parse(url, agent='fetch_podcasts.py')
    for entry in response['entries']:
        for enclosure in entry.get('enclosures', []):
            print 'Queuing:', enclosure['url']
            enclosure_queue.put(enclosure['url'])
        
# Now wait for the queue to be empty, indicating that we have
# processed all of the downloads.
print '*** Main thread waiting'
enclosure_queue.join()
print '*** Done'

First, we establish some operating parameters. Normally these would come from user inputs (preferences, a database, whatever). For our example we hard code the number of threads to use and the list of URLs to fetch.

Next, we need to define the function downloadEnclosures() that will run in the worker thread, processing the downloads. Again, for illustration purposes this only simulates the download. To actually download the enclosure, you might use urllib or urllib2. In this example, we simulate a download delay by sleeping a variable amount of time, depending on the thread id.

Once the threads’ target function is defined, we can start the worker threads. Notice that downloadEnclosures() will block on the statement url = q.get() until the queue has something to return, so it is safe to start the threads before there is anything in the queue.

The next step is to retrieve the feed contents (using Mark Pilgrim’s feedparser module) and enqueue the URLs of the enclosures. As soon as the first URL is added to the queue, one of the worker threads should pick it up and start downloading it. The loop below will continue to add items until the feed is exhausted, and the worker threads will take turns dequeuing URLs to download them.

And the only thing left to do is wait for the queue to empty out again, using join().

If you run the sample script, you should see output something like this:

0: Looking for the next enclosure
1: Looking for the next enclosure
Queuing: http://http.earthcache.net/htc-01.media.globix.net/COMP009996MOD1/Danny_Meyer.mp3
Queuing: http://feeds.feedburner.com/~r/drmoldawer/~5/104445110/moldawerinthemorning_show34_032607.mp3
Queuing: http://www.podtrac.com/pts/redirect.mp3/twit.cachefly.net/MBW-036.mp3
Queuing: http://media1.podtech.net/media/2007/04/PID_010848/Podtech_calacaniscast22_ipod.mp4
Queuing: http://media1.podtech.net/media/2007/03/PID_010592/Podtech_SXSW_KentBrewster_ipod.mp4
Queuing: http://media1.podtech.net/media/2007/02/PID_010171/Podtech_IDM_ChrisOBrien2.mp3
Queuing: http://feeds.feedburner.com/~r/drmoldawer/~5/96188661/moldawerinthemorning_show30_022607.mp3
*** Main thread waiting
0: Downloading: http://http.earthcache.net/htc-01.media.globix.net/COMP009996MOD1/Danny_Meyer.mp3
1: Downloading: http://feeds.feedburner.com/~r/drmoldawer/~5/104445110/moldawerinthemorning_show34_032607.mp3
0: Looking for the next enclosure
0: Downloading: http://www.podtrac.com/pts/redirect.mp3/twit.cachefly.net/MBW-036.mp3
1: Looking for the next enclosure
1: Downloading: http://media1.podtech.net/media/2007/04/PID_010848/Podtech_calacaniscast22_ipod.mp4
0: Looking for the next enclosure
0: Downloading: http://media1.podtech.net/media/2007/03/PID_010592/Podtech_SXSW_KentBrewster_ipod.mp4
0: Looking for the next enclosure
0: Downloading: http://media1.podtech.net/media/2007/02/PID_010171/Podtech_IDM_ChrisOBrien2.mp3
1: Looking for the next enclosure
1: Downloading: http://feeds.feedburner.com/~r/drmoldawer/~5/96188661/moldawerinthemorning_show30_022607.mp3
0: Looking for the next enclosure
1: Looking for the next enclosure
*** Done

The actual output will depend on whether anyone modifies the subscriptions in the guest account on http://www.CastSampler.com.

See also

Queue
Standard library documentation for this module.
Deque from collections
collections includes a deque (double-ended queue) class
Wikipedia: Queue data structures
http://en.wikipedia.org/wiki/Queue_(data_structure)
Wikipedia: FIFO
http://en.wikipedia.org/wiki/FIFO
feedparser
Mark Pilgrim’s feedparser module (http://www.feedparser.org/).
In-Memory Data Structures
Other complex data structures in the standard library.