This is a simple Redis-based queue. Two features that I needed were uniqueness (i.e. if an item exists in the queue already, it won't be added again) and a delay, like beanstalkd, where an item must wait a specified time before it can be popped from the queue. There are a number of other Redis-based queues that have many more features but I didn't see one that had these two features together. This 50-line class works for my needs. It may or may not work for you. Feel free to copy this and build on it.
Install on Ubuntu 10.10 Maverick
$ sudo apt-get install redis-server
$ pip install redis
The queue is based on the redis sorted set data type and uses the following commands:
import time import redis REDIS_ADDRESS = '127.0.0.1' class UniqueMessageQueueWithDelay(object): """A message queue based on the Redis sorted set data type. Duplicate items in the queue are not allowed. When a duplicate item is added to the queue, the new item is added, and the old duplicate item is removed. A delay may be specified when adding items to the queue. Items will only be popped after the delay has passed. Pop() is non-blocking, so polling must be used. The name of the queue == the Redis key for the sorted set. """ def __init__(self, name): self.name = name self.redis = redis.Redis(REDIS_ADDRESS) def add(self, data, delay=0): """Add an item to the queue. delay is in seconds. """ score = time.time() + delay self.redis.zadd(self.name, data, score) debug('Added %.1f, %s' % (score, data)) def pop(self): """Pop one item from the front of the queue. Items are popped only if the delay specified in the add() has passed. Return False if no items are available. """ min_score = 0 max_score = time.time() result = self.redis.zrangebyscore( self.name, min_score, max_score, start=0, num=1, withscores=False) if result == None: return False if len(result) == 1: debug('Popped %s' % result) return result else: return False def remove(self, data): return self.redis.zrem(self.name, data) def debug(msg): print msg def test_queue(): u = UniqueMessageQueueWithDelay('myqueue') # add items to the queue for i in [0, 1, 2, 3, 4, 0, 1]: data = 'Item %d' % i delay = 5 u.add(data, delay) time.sleep(0.1) # get items from the queue while True: print result = u.pop() print result if result != False: u.remove(result) time.sleep(1) if __name__ == '__main__': test_queue()
Added 1320773851.8, Item 0 Added 1320773851.9, Item 1 Added 1320773852.0, Item 2 Added 1320773852.1, Item 3 Added 1320773852.2, Item 4 Added 1320773852.3, Item 0 Added 1320773852.4, Item 1 False False False False False Popped Item 2 Item 2 Popped Item 3 Item 3 Popped Item 4 Item 4 Popped Item 0 Item 0 Popped Item 1 Item 1 False False False ^CTraceback (most recent call last): File "umqwdredisqueue.py", line 102, in
test_queue() File "umqwdredisqueue.py", line 98, in test_queue time.sleep(1) KeyboardInterrupt
Was trying to do the same thing, with only difference was i dont care about uniqueness (my objects are unique from before), and i ended up writing pretty much the exact same approach as you, but i find one problem with this.
This is not a real pop. If 2 workers are working on the same que, it is possible both get the same object when doing zrangebyscore at the same time before any one of them is able to zrem it. This is a very important consideration since i am in process of re-architecting the application to be able to run concurrently across cores (and eventually machines) with redis (or something) maintaining state.
Since i use redis for other things, id much rather use it for the que as well. Any ideas of the pop can be done in an atomic manner?
Will look into beanstalkd next.
Why do you need a delay, where an item must wait a specified time before it can be popped from the queue? What use case did you have?
I'm Eliot and this is my notepad for programming topics such as Python, Django, Ubuntu, Emacs, etc... more »