celery.contrib.testing.mocks
¶
API Reference¶
Useful mocks for unit testing.
- celery.contrib.testing.mocks.TaskMessage(name, id=None, args=(), kwargs=None, callbacks=None, errbacks=None, chain=None, shadow=None, utc=None, **options)[source]¶
Create task message in protocol 2 format.
- celery.contrib.testing.mocks.TaskMessage1(name, id=None, args=(), kwargs=None, callbacks=None, errbacks=None, chain=None, **options)[source]¶
Create task message in protocol 1 format.
- celery.contrib.testing.mocks.task_message_from_sig(app, sig, utc=True, TaskMessage=<function TaskMessage>)[source]¶
Create task message from
celery.Signature
.Example
>>> m = task_message_from_sig(app, add.s(2, 2)) >>> amqp_client.basic_publish(m, exchange='ex', routing_key='rkey')