django_dramatiq icon indicating copy to clipboard operation
django_dramatiq copied to clipboard

Testing with database and pytest

Open husio opened this issue 5 years ago • 6 comments

I am using pytest-django to test my application and test setup like descripbed in the README file. I have a task that registers a user in database and to test it I have the following code:

# tasks.py
@dramatiq.actor
def register_user(email):
    user = User.objects.create(email=email)
    return user.id

# tests.py
@pytest.mark.django_db
def test_register_user_task(broker, worker):
    tasks.register_user.send("[email protected]")
    broker.join(tasks.register_user.queue_name)
    worker.join()
    assert User.objects.count() == 1

The test fails, because no user can be found. I believe this is because each test is running in its own database isolation. I believe because dramatiq is using threads for its workers, each worker is getting a separate view of the database state, that is not shared with the main test process. Is this correct?

If my understanding is correct, would it make sense to provide a task worker that runs synchronously in the same thread/process as the test? I have wrote a prototype for such approach and it seems to be working as expected. Would it be helpful to include it in either dramatiq or django_dramatiq?

husio avatar Sep 06 '19 14:09 husio

IIRC the default mode in tests is to handle db ops inside a transaction and then roll it back. In order for data to be visible outside of the transaction (i.e. in the worker threads), the transaction needs to be committed. What you need to do, I think, is pass the transaction=True parameter to django_db, like so:


@pytest.mark.django_db(transaction=True)
def test_register_user_task(broker, worker):
    tasks.register_user.send("[email protected]")
    broker.join(tasks.register_user.queue_name)
    worker.join()
    assert User.objects.count() == 1

would it make sense to provide a task worker that runs synchronously in the same thread/process as the test

This has been brought up a couple of times, and I'm still not sure exactly where I sit on the issue. On the one hand, yes, the way things work right now does make testing harder than it otherwise would be. On the other, the way things work right now means that the tests better reflect the real world behavior of your application, which, IME, is more valuable than ease of use.

Bogdanp avatar Sep 07 '19 14:09 Bogdanp

I was trying to reproduce my issues with the database using a freshly created Django test application. Background processing, task joining and database operations all work as advertised in my test application. I was not able to reproduce the problem, so I tried the same in my production application. This time everything seems to be working as expected. Adding transaction=True is important and I don't know what I did before that the test was not passing.

I think having a synchronous task worker would be a nice addition. It might be confusing for the user why there are two worker implementations and which one is the right choice for testing. I think this is a library design decision that you can do much better :wink: Please close this issue if you decide to not add synchronous tasks processing.

This is the code that I wrote in order to process the tasks synchronously. Calling wait_for_tasks is equal to processing all queued tasks. I don't think is a proper implementation. I leave it here because maybe it will provide any value for future discussion or reference.

# conftest.py
@pytest.fixture()
def wait_for_task():
    broker = dramatiq.get_broker()
    broker.flush_all()
    worker = SynchronousWorker(broker, worker_timeout=100)
    worker.start()

    def wait_for_task(*task_funcs):
        queue_names = set(fn.queue_name for fn in task_funcs)
        worker.process_sync()
        broker.join(*queue_names, fail_fast=True)

    yield wait_for_task


class SynchronousWorker(worker.Worker):
    def _add_worker(self):
        worker = WorkerLocalThread(
                broker=self.broker,
                consumers=self.consumers,
                work_queue=self.work_queue,
                worker_timeout=self.worker_timeout
                )
        worker.start()
        self.workers.append(worker)

    def process_sync(self):
        for w in self.workers:
            w.process_sync()


class WorkerLocalThread(worker._WorkerThread):
    def start(self):
        # Overwrite this method and avoid creating a separate thread.
        # No super().start()
        pass

    def process_sync(self):
        while not self.work_queue.empty():
            _, message = self.work_queue.get()
            self.process_message(message)
        self.broker.emit_before("worker_thread_shutdown", self)

    def join(self, timeout=0):
        pass

husio avatar Sep 09 '19 08:09 husio

Hi,

I believe it would be very useful to mention in the documentation the transaction=True, this is not something that obvious and would help many (including me) avoid spending hours debugging the issue.

hassek avatar Mar 20 '20 13:03 hassek

How can I use wait_for_task fixture in tests? Please, give an example. Thank you

Guest007 avatar Apr 30 '21 08:04 Guest007

My private working solution:

from dramatiq import actor as dramatiq_actor

class ActorForTest:
    def __init__(self, *args, **kwargs):
        self.fn = args[0]

    def send(self, *args, **kwargs):
        return self.fn(*args, **kwargs)

def actor(*args, actor_class=None, **kwargs):
    if TEST_DRAMATIQ:
        return in_test_actor(*args, **kwargs)
    return dramatiq_actor(*args, actor_class=actor_class, **kwargs)

def in_test_actor(fn=None, *, actor_class=ActorForTest, actor_name=None, queue_name='default', priority=0, broker=None, **options):
    def decorator(fn):
        actor_name = fn.__name__
        return actor_class(
            fn, actor_name=actor_name, queue_name=queue_name,
            priority=priority, broker='broker', options=options,
        )
    if fn is None:
        return decorator
    return decorator(fn)

Where TEST_DRAMATIQ is an environment variable.

Guest007 avatar May 07 '21 10:05 Guest007

My solution:

class DramatiqStub(StubBroker):
    def enqueue(self, message, *, delay=None):
        return self.actors[message.actor_name].fn(*message.args, **message.kwargs)

vsevolodgarkusha avatar Oct 20 '23 17:10 vsevolodgarkusha