RabbitMQ с проектом Django и Scrapy для отправки и обработки сообщений

У меня есть приложение Django, которое работает с Celery и использует RabbitMQ в качестве брокера сообщений. У меня есть отдельный проект для scrapy, откуда я собираю данные и хочу отправить эти данные в rabbitMQ, а затем django будет потреблять эти сообщения RabbitMQ через celery. Мне нужна помощь в потреблении сообщений, отправленных в rabbitMQ из проекта scrapy.

фрагмент кода.

scrapy

def process_item(self, item, spider):
    publish_message(item)    
    return item

def publish_message(data):
    import pika
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost', port=5672))
    channel = connection.channel()
    channel.basic_publish(exchange='topic', routing_key='scrapy', body='Hello From 
     scrapy!')
    connection.close()

В приложении django, consumers.py

import pika


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', heartbeat=600,
                                                               blocked_connection_timeout=300))
channel = connection.channel()

def callback(ch, method, properties, body):
    print(" data =============== ", data)
    # I will call celery task here once code print the data to make sure its running. unfortunately its not running. :( 
    return 


channel.basic_consume(queue='scrapy', on_message_callback=callback, auto_ack=True)
print("Started Consuming...")
channel.start_consuming()
connection.close()

celery.py

from kombu import Exchange, Queue

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_project.settings.development')

celery_app = Celery('my_project', broker='amqp://guest:guest@rabbit:5672', backend='rpc://0.0.0.0:5672')
celery_app.config_from_object(f'django.conf:settings', namespace='CELERY')
celery_app.autodiscover_tasks()

celery_app.conf.update(
    worker_max_tasks_per_child=1,
    broker_pool_limit=None
)

default_exchange = Exchange('default', type='topic')
scrapy_exchange = Exchange('scrapy', type='topic')

celery_app.conf.task_queues = (
Queue('scrapy', scrapy_exchange, routing_key='scrapy.#'),
)

Вы не объявили очередь при потреблении. попробуйте это:

Издатель

def process_item(self, item, spider):
publish_message(item)    
return item

def publish_message(data):
    import pika
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost', port=5672))
    channel = connection.channel()
    channel.exchange_declare(exchange='topic')
    channel.basic_publish(exchange='topic', routing_key='scrapy', body='Hello From 
     scrapy!')
    connection.close()

Потребитель

import pika


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', heartbeat=600,
                                                               blocked_connection_timeout=300))
channel = connection.channel()

def callback(ch, method, properties, body):
    print(" data =============== ", body)
    # I will call celery task here once code print the data to make sure its running. unfortunately its not running. :( 
    return 
channel.exchange_declare(exchange='topic')
channel.queue_declare(queue='scrapy')
channel.queue_bind(exchange='topic', queue='scrapy', routing_key='scrapy')
channel.basic_consume(queue='scrapy', on_message_callback=callback, auto_ack=True)
print("Started Consuming...")
channel.start_consuming()
connection.close()
Вернуться на верх