django-sqs icon indicating copy to clipboard operation
django-sqs copied to clipboard

Integrate Amazon Simple Queue Service in your Django project

#+COMMENT: -- org -- #+TITLE: django-sqs #+AUTHOR: Maciej Pasternacki #+EMAIL: [email protected] #+DATE: 2010-01-15 Fri #+MAINTAINER:Adrien Lemaire #+TEXT: Integrate Amazon Simple Queue Service in your Django project #+KEYWORDS: #+LANGUAGE: en #+OPTIONS: H:3 num:t toc:t \n:nil @:t ::t |:t ^:t -:t f:t *:t <:t #+OPTIONS: TeX:t LaTeX:t skip:nil d:nil todo:t pri:nil tags:not-in-toc #+INFOJS_OPT: view:nil toc:nil ltoc:t mouse:underline buttons:0 path:http://orgmode.org/org-info.js #+EXPORT_SELECT_TAGS: export #+EXPORT_EXCLUDE_TAGS: noexport #+LINK_UP: #+LINK_HOME: http://github.com/fandekasp/django-sqs/

  • Note on Upgrades Being overwhelmed by work, I won't upgrade this package if nobody asks for it. If you're capable, please fork and pull request to help me, thanks in advance !

  • Setup Boto library for accessing Amazon Web Services is required.

    1. Add =django_sqs= to your Python path
    2. Add =django_sqs= to INSTALLED_APPS setting
    3. Set AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
    4. Optionally set SQS_QUEUE_PREFIX to prefix your queues and avoid clashes with other developers, production/staging env and so on. SQS_QUEUE_PREFIX is required when DEBUG is true, and recommended even in production mode.
    5. Optionally set SQS_DEFAULT_VISIBILITY_TIMEOUT (default is 60 seconds)
    6. Optionally set SQS_POLL_PERIOD (default is 10 seconds)
  • Receivers Create receiver function that accepts one argument, which will be an instance of =boto.sqs.message.Message= or its custom subclass. Then, register it as queue receiver:

** Register using a decorator Decorate receiver function with:

: django_sqs.receiver([queue_name=None, visibility_timeout=None, message_class=None, delete_on_start=False, close_database=False, suffixes=()])

Decorated function will become an instance of =django_sqs.registered_queue.RegisteredQueue.ReceiverProxy= class. Instance is callable - you may call it with an instance of appropriate message class, or its constructor's keyword arguments, and the message will be added to the queue. Instance provides also attributes =direct= with original decorated function, and =registered_queue= with appropriate =django_sqs.registered_queue.RegisteredQueue= instance.

If =queue_name= is omitted or false, function's =module= and =name= is used, with dots converted to double underscores.

The =suffixes= argument is a tuple of known queue name suffixes. If an unknown suffix is used, a warning will be issued.

If =delete_on_start= is true, received message will be deleted from queue right after it's been received, before receiver function is called. If it is false (which is the default), it will be deleted after receiver function has finished, when message has been fully processed.

If =close_database= is true, all database connections will be closed after processing each message to prevent pending unclosed transactions.

Queue name suffixes can be used to split processing similar items to multiple queues (e.g. use separate queue for big input items to distribute load).

** Register manually Alternatively, you can avoid decoration, and register a receiver manually by calling:

: django_sqs.register(queue_name, [fn=None, visibility_timeout=None, message_class=None, delete_on_start=False, suffixes=()])

If =fn= is None or not given, no handler is assigned: messages can be sent, but won't be received.

Create function in modules that will be imported by default (recommendation: use =receivers.py= and import them in =models.py=, autoimporting TBD).

** Example receiver : @receiver("test") : def receive_message(msg): : print 'received:', msg.get_body()

  • Receiving : python manage.py runreceiver [--message-limit=N] [--suffix=SUFFIX] [queue_name [queue_name [...]]]

    If no =queue_name= parameters are given, receive from all configured queues.

    If more than one queue is registered, a new process is forked for each queue.

    For each message received on the queue, registered receiver function is called with the message instance as argument. If receiver function returns successfully, message is then deleted from queue. If receiver message raises an exception, exception traceback is logged using logging module, and message is deleted. If receiver sees a restartable error and wants to keep message in queue, it should raise =django_sqs.RestartLater= exception - this exception will leave the message in queue.

    Options:

    • =--message-limit=N= :: exit after receiving =N= messages
    • =--suffix=SUFFIX= :: Use queue name suffix
  • Sending ** Using decorated function You can simply call function decorated with =@receiver= decorator, providing a message instance or keyword arguments (like for =send= function described below). ** Manually To send a message manually, use following function:

    : django_sqs.send(queue_name, message=None, suffix=None, **kwargs)

    =message= should be an instance of =message_class= configured with =receiver= decorator or =register= function for the queue (or =boto.sqs.message.Message=).

    When =message= is omitted or =None=, new instance of queue's message class will be instantiated using =**kwargs=. With default message class, =boto.sqs.message.Message=, we can simply provide body:

    : django_sqs.send("a_queue", body='Lorem ipsum dolor sit amet')

    =suffix= is a queue name suffix to use.

  • Custom message classes For sending other values than raw, non-unicode strings, any of classes provided in =boto.sqs.message= or their subclasses may be used. The module is well commented (much better than this one), so go ahead and read the fine source!

** ModelInstanceMessage class The =django_sqs.message.ModelInstanceMessage= class is provided for convenience. It encodes a single model instance, using Django's ContentType framework (as app/model/primary key triple). It accepts =instance= keyword parameter in constructor, and provides =get_instance()= method.

There is no support for passing additional information except the instance yet.

  • Management ** manage.py sqs_status Prints the (approximate) count of messages in the queue. ** manage.py sqs_clear Clears all queues (by default), or queues named as arguments. Prints number of messages deleted.

    If queue receivers are running or were running recently, some messages may be still locked and won't be deleted. Command may need to be re-run. ** manage.py sqs_wait Waits until specified (or all) queues are empty.

  • Views A single view, =django_sqs.views.status=, is provided for simple, plain text queue status report (same as =manage.py sqs_status=).

  • FIXME ** TODO Sensible forking/threading or multiplexing instead of the fork hack? ** TODO Autoimporting receivers.py from apps ** TODO docstrings ** TODO Minimize polling Amazon charges for every call. Less polling, lower invoice. Some exponential backoff + out-of-band signal (view?) to wake up a running receiver process may be a good thing. ** TODO Custom exception to leave message in queue Provide a custom exception class that won't be handled by receive loop (i.e. no backtrace) that can be used by receiver function to explicitly leave message in queue without printing backtrace and alarming everyone.