How to make Celery + Django + Pytest work together
Trying to test an endpoint that has a celery task. Celery tasks don't seem to run in the test.
- django==4.1.5
- celery==5.2.7
- pytest==7.2.1
- pytest-django==4.5.2
An endpoint:
def do_some_stuff(blah: Blah) -> Blah:
res = cool_task.apply_async(kwargs={
'cool_id': int(pk),
'config': config,
'name': RESUBMIT.value,
},
link=update_status.si(
cool_id=int(pk),
new_status="why is this so hard",
)
)
[...]
A test:
@pytest.mark.django_db
def test_my_thing(django_client: Client) -> None:
[...]
response = django_client.post(f"/api/myendpoint/{mything.id}/do_some_stuff/")
It hits the endpoint. Gets a 202 back as expected. But celery doesn't seem to be picking up the task in the test. The update_status
method updates the db, and I'm not seeing that happen.
I've tried creating a celery app in the test, creating a worker in the test, changing the test to use the main DB instead of the test db, setting the override_settings to "BROKER_BANDEND='memory'".
I'd like a full working example. It seems kind of basic but it's eluding me. I don't understand what combination of fixtures and overrides I need for this to work.
Seems to work when I actually call the application.