Сельдерей потребляет все очереди
Я пытаюсь достичь следующей конфигурации:
- Send a message to RabbitMQ.
- Copy this message to 2 different queues.
- Run 2 consumers, where each of them would consume from its own queue.
Итак, для отправки сообщений у меня есть следующее:
def publish_message():
with app.producer_pool.acquire(block=True) as producer:
producer.publish(
body={"TEST": "OK"},
exchange='myexchange',
routing_key='mykey',
)
Я создаю своих потребителей таким образом:
with app.pool.acquire(block=True) as conn:
exchange = kombu.Exchange(
name="myexchange",
type="direct",
durable=True,
channel=conn,
)
exchange.declare()
queue1 = kombu.Queue(
name="myqueue",
exchange=exchange,
routing_key="mykey",
channel=conn,
# message_ttl=600,
# queue_arguments={
# "x-queue-type": "classic"
# },
durable=True
)
queue1.declare()
queue2 = kombu.Queue(
name="myotherqueue",
exchange=exchange,
routing_key="mykey",
channel=conn,
# message_ttl=600,
# queue_arguments={
# "x-queue-type": "classic"
# },
durable=True
)
queue2.declare()
class MyConsumer1(bootsteps.ConsumerStep):
def get_consumers(self, channel):
return [
kombu.Consumer(
channel,
queues=[queue1],
callbacks=[self.handle],
accept=["json"]
)
]
def handle(self, body, message):
print(f"\n### 1 ###\nBODY: {body}\nCONS: {self.consumers}\n#########\n")
message.ack()
class MyConsumer2(bootsteps.ConsumerStep):
def get_consumers(self, channel):
return [
kombu.Consumer(
channel,
queues=[queue2],
callbacks=[self.handle],
accept=["json"]
)
]
def handle(self, body, message):
print(f"\n### 2 ###\nBODY: {body}\nCONS: {self.consumers}\n#########\n")
message.ack()
app.steps["consumer"].add(MyConsumer1)
app.steps["consumer"].add(MyConsumer2)
Но когда я запускаю worker, передавая -Q myqueue
, я получаю следующее:
Как видите, один потребитель потребляет сообщения из обеих очередей. Я проверил, что это сообщение отправляется правильно, отключив celery worker, и я могу подтвердить, что сообщение появилось в двух очередях: myqueue
и myotherqueue
.
Я запускаю это как часть проекта Django, если это важно.