Сельдерей потребляет все очереди

Я пытаюсь достичь следующей конфигурации:

  1. Send a message to RabbitMQ.
  2. Copy this message to 2 different queues.
  3. Run 2 consumers, where each of them would consume from its own queue.

Итак, для отправки сообщений у меня есть следующее:

def publish_message():
    with app.producer_pool.acquire(block=True) as producer:
        producer.publish(
            body={"TEST": "OK"},
            exchange='myexchange',
            routing_key='mykey',
        )

Я создаю своих потребителей таким образом:


with app.pool.acquire(block=True) as conn:
    exchange = kombu.Exchange(
        name="myexchange",
        type="direct",
        durable=True,
        channel=conn,
    )
    exchange.declare()
    queue1 = kombu.Queue(
        name="myqueue",
        exchange=exchange,
        routing_key="mykey",
        channel=conn,
        # message_ttl=600,
        # queue_arguments={
        #     "x-queue-type": "classic"
        # },
        durable=True
    )
    queue1.declare()
    queue2 = kombu.Queue(
        name="myotherqueue",
        exchange=exchange,
        routing_key="mykey",
        channel=conn,
        # message_ttl=600,
        # queue_arguments={
        #     "x-queue-type": "classic"
        # },
        durable=True
    )
    queue2.declare()


    class MyConsumer1(bootsteps.ConsumerStep):
        def get_consumers(self, channel):
            return [
                    kombu.Consumer(
                        channel,
                        queues=[queue1],
                        callbacks=[self.handle],
                        accept=["json"]
                )
            ]

        def handle(self, body, message):
            print(f"\n### 1 ###\nBODY: {body}\nCONS: {self.consumers}\n#########\n")
            message.ack()


    class MyConsumer2(bootsteps.ConsumerStep):
        def get_consumers(self, channel):
            return [
                kombu.Consumer(
                    channel,
                    queues=[queue2],
                    callbacks=[self.handle],
                    accept=["json"]
                )
            ]

        def handle(self, body, message):
            print(f"\n### 2 ###\nBODY: {body}\nCONS: {self.consumers}\n#########\n")
            message.ack()


    app.steps["consumer"].add(MyConsumer1)
    app.steps["consumer"].add(MyConsumer2)

Но когда я запускаю worker, передавая -Q myqueue, я получаю следующее:

Как видите, один потребитель потребляет сообщения из обеих очередей. Я проверил, что это сообщение отправляется правильно, отключив celery worker, и я могу подтвердить, что сообщение появилось в двух очередях: myqueue и myotherqueue.

Я запускаю это как часть проекта Django, если это важно.

Вернуться на верх