How to make Celery + Django + Pytest work together

Trying to test an endpoint that has a celery task. Celery tasks don't seem to run in the test.

  • django==4.1.5
  • celery==5.2.7
  • pytest==7.2.1
  • pytest-django==4.5.2

An endpoint:

def do_some_stuff(blah: Blah) -> Blah:
          res = cool_task.apply_async(kwargs={
            'cool_id': int(pk),
            'config': config,
            'name': RESUBMIT.value,
                new_status="why is this so hard",

A test:

def test_my_thing(django_client: Client) -> None:
  response ="/api/myendpoint/{}/do_some_stuff/")

It hits the endpoint. Gets a 202 back as expected. But celery doesn't seem to be picking up the task in the test. The update_status method updates the db, and I'm not seeing that happen.

I've tried creating a celery app in the test, creating a worker in the test, changing the test to use the main DB instead of the test db, setting the override_settings to "BROKER_BANDEND='memory'".

I'd like a full working example. It seems kind of basic but it's eluding me. I don't understand what combination of fixtures and overrides I need for this to work.

Seems to work when I actually call the application.

Back to Top