Saltycrane logo

SaltyCrane Blog

Notes on Python, Django, and web development on Ubuntu Linux

    

A unique Python redis-based queue with delay

This is a simple Redis-based queue. Two features that I needed were uniqueness (i.e. if an item exists in the queue already, it won't be added again) and a delay, like beanstalkd, where an item must wait a specified time before it can be popped from the queue. There are a number of other Redis-based queues that have many more features but I didn't see one that had these two features together. This 50-line class works for my needs. It may or may not work for you. Feel free to copy this and build on it.

Note: I wrote this in May 2010. I ended up using this solution after trying out beanstalkd and Gearman.

Install

Install on Ubuntu 10.10 Maverick

  • Install the redis server
    $ sudo apt-get install redis-server 
  • Install the python redis client
    $ pip install redis 
  • Default conf file: /etc/redis/redis.conf
    Default log file: /var/log/redis/redis-server.log
    Default db dir: /var/lib/redis
    Stop redis server: sudo /etc/init.d/redis-server stop
    Start redis server: sudo /etc/init.d/redis-server start

Redis commands used

The queue is based on the redis sorted set data type and uses the following commands:

  • ZADD - Add members to a sorted set, or update its score if it already exists
  • ZRANGEBYSCORE - Return a range of members in a sorted set, by score
  • ZREM - Remove one or more members from a sorted set

Code

import time
import redis


REDIS_ADDRESS = '127.0.0.1'


class UniqueMessageQueueWithDelay(object):
    """A message queue based on the Redis sorted set data type. Duplicate items
    in the queue are not allowed. When a duplicate item is added to the queue,
    the new item is added, and the old duplicate item is removed. A delay may be
    specified when adding items to the queue. Items will only be popped after
    the delay has passed. Pop() is non-blocking, so polling must be used. The
    name of the queue == the Redis key for the sorted set.
    """
    def __init__(self, name):
        self.name = name
        self.redis = redis.Redis(REDIS_ADDRESS)

    def add(self, data, delay=0):
        """Add an item to the queue. delay is in seconds.
        """
        score = time.time() + delay
        self.redis.zadd(self.name, data, score)
        debug('Added %.1f, %s' % (score, data))

    def pop(self):
        """Pop one item from the front of the queue. Items are popped only if
        the delay specified in the add() has passed. Return False if no items
        are available.
        """
        min_score = 0
        max_score = time.time()
        result = self.redis.zrangebyscore(
            self.name, min_score, max_score, start=0, num=1, withscores=False)
        if result == None:
            return False
        if len(result) == 1:
            debug('Popped %s' % result[0])
            return result[0]
        else:
            return False

    def remove(self, data):
        return self.redis.zrem(self.name, data)


def debug(msg):
    print msg


def test_queue():
    u = UniqueMessageQueueWithDelay('myqueue')

    # add items to the queue
    for i in [0, 1, 2, 3, 4, 0, 1]:
        data = 'Item %d' % i
        delay = 5
        u.add(data, delay)
        time.sleep(0.1)

    # get items from the queue
    while True:
        print
        result = u.pop()
        print result
        if result != False:
            u.remove(result)
        time.sleep(1)


if __name__ == '__main__':
    test_queue()

Results:

Added 1320773851.8, Item 0
Added 1320773851.9, Item 1
Added 1320773852.0, Item 2
Added 1320773852.1, Item 3
Added 1320773852.2, Item 4
Added 1320773852.3, Item 0
Added 1320773852.4, Item 1

False

False

False

False

False

Popped Item 2
Item 2

Popped Item 3
Item 3

Popped Item 4
Item 4

Popped Item 0
Item 0

Popped Item 1
Item 1

False

False

False
^CTraceback (most recent call last):
  File "umqwdredisqueue.py", line 102, in 
    test_queue()
  File "umqwdredisqueue.py", line 98, in test_queue
    time.sleep(1)
KeyboardInterrupt

See also

2 Comments — feed icon Comments feed for this post


#1 Sajal commented on 2013-04-28:

Was trying to do the same thing, with only difference was i dont care about uniqueness (my objects are unique from before), and i ended up writing pretty much the exact same approach as you, but i find one problem with this.

This is not a real pop. If 2 workers are working on the same que, it is possible both get the same object when doing zrangebyscore at the same time before any one of them is able to zrem it. This is a very important consideration since i am in process of re-architecting the application to be able to run concurrently across cores (and eventually machines) with redis (or something) maintaining state.

Since i use redis for other things, id much rather use it for the que as well. Any ideas of the pop can be done in an atomic manner?

Will look into beanstalkd next.


#2 Jabba Laci commented on 2014-03-15:

Why do you need a delay, where an item must wait a specified time before it can be popped from the queue? What use case did you have?

Post a comment

Required
Required, but not displayed
Optional

Format using Markdown. (No HTML.)
  • Code blocks: prefix each line by at least 4 spaces or 1 tab (and a blank line before and after)
  • Code span: surround with backticks
  • Blockquotes: prefix lines to be quoted with >
  • Links: <URL>
  • Links w/ description: [description](URL)
Created with Django | Hosted by Linode