Linux Solved

All Linux tips, Solved!

Recent Entries

Archive

Tag Cloud

RSS/Atom

A unique Python redis-based queue with delay

April 22nd, 2012 — 0 Comments — Permalink

  • queue
  • python
  • unique
  • redis-based
  • delay

This is a simple Redis-based queue. Two features that I needed were uniqueness (i.e. if an item exists in the queue already, it won't be added again) and a delay, like beanstalkd, where an item must wait a specified time before it can be popped from the queue. There are a number of other Redis-based queues that have many more features but I didn't see one that had these two features together. This 50-line class works for my needs. It may or may not work for you. Feel free to copy this and build on it.

Note: I wrote this in May 2010. I ended up using this solution after trying out beanstalkd and Gearman.

Install

Install on Ubuntu 10.10 Maverick

  • Install the redis server
    $ sudo apt-get install redis-server 
  • Install the python redis client
    $ pip install redis 
  • Default conf file: /etc/redis/redis.conf
    Default log file: /var/log/redis/redis-server.log
    Default db dir: /var/lib/redis
    Stop redis server: sudo /etc/init.d/redis-server stop
    Start redis server: sudo /etc/init.d/redis-server start

Redis commands used

The queue is based on the redis sorted set data type and uses the following commands:

  • ZADD - Add members to a sorted set, or update its score if it already exists
  • ZRANGEBYSCORE - Return a range of members in a sorted set, by score
  • ZREM - Remove one or more members from a sorted set

Code

import time
import redis

REDIS_ADDRESS = '127.0.0.1'

class UniqueMessageQueueWithDelay(object):
    """A message queue based on the Redis sorted set data type. Duplicate items
    in the queue are not allowed. When a duplicate item is added to the queue,
    the new item is added, and the old duplicate item is removed. A delay may be
    specified when adding items to the queue. Items will only be popped after
    the delay has passed. Pop() is non-blocking, so polling must be used. The
    name of the queue == the Redis key for the sorted set.
    """
    def __init__(self, name):
        self.name = name
        self.redis = redis.Redis(REDIS_ADDRESS)

def add(self, data, delay=0):
        """Add an item to the queue. delay is in seconds.
        """
        score = time.time() + delay
        self.redis.zadd(self.name, data, score)
        debug('Added %.1f, %s' % (score, data))

def pop(self):
        """Pop one item from the front of the queue. Items are popped only if
        the delay specified in the add() has passed. Return False if no items
        are available.
        """
        min_score = 0
        max_score = time.time()
        result = self.redis.zrangebyscore(
            self.name, min_score, max_score, start=0, num=1, withscores=False)
        if result == None:
            return False
        if len(result) == 1:
            debug('Popped %s' % result[0])
            return result[0]
        else:
            return False

def remove(self, data):
        return self.redis.zrem(self.name, data)

def debug(msg):
    print msg

def test_queue():
    u = UniqueMessageQueueWithDelay('myqueue')

# add items to the queue
    for i in [0, 1, 2, 3, 4, 0, 1]:
        data = 'Item %d' % i
        delay = 5
        u.add(data, delay)
        time.sleep(0.1)

# get items from the queue
    while True:
        print
        result = u.pop()
        print result
        if result != False:
            u.remove(result)
        time.sleep(1)

if __name__ == '__main__':
    test_queue()

Results:

Added 1320773851.8, Item 0
Added 1320773851.9, Item 1
Added 1320773852.0, Item 2
Added 1320773852.1, Item 3
Added 1320773852.2, Item 4
Added 1320773852.3, Item 0
Added 1320773852.4, Item 1

False

False

False

False

False

Popped Item 2
Item 2

Popped Item 3
Item 3

Popped Item 4
Item 4

Popped Item 0
Item 0

Popped Item 1
Item 1

False

False

False
^CTraceback (most recent call last):
  File "umqwdredisqueue.py", line 102, in 
    test_queue()
  File "umqwdredisqueue.py", line 98, in test_queue
    time.sleep(1)
KeyboardInterrupt

Some links related to Redis queues

  • http://github.com/defunkt/resque
  • http://github.com/samuel/dreque
  • http://github.com/gleicon/restmq
  • http://github.com/tnm/qr
  • http://github.com/blog/542-introducing-resque
  • http://nosql.mypopescu.com/post/366619997/usecase-restmq-a-redis-based-message-queue
  • http://nosql.mypopescu.com/post/426360602/redis-queues-an-emerging-usecase

See also

  • Redis tutorial, April 2010 - by Simon Willison
  • Shades of Gray: Setting up the Redis Server
  • DeGizmo - Getting Started: Redis and Python


Source: www.saltycrane.com

Discussion

There are no comments for this post.

Leave A Reply

You may use Markdown syntax but raw HTML will be escaped and headings normalised.

Log in

2011 © SolvedLinux.com