RabbitMQ с проектом Django и Scrapy для отправки и обработки сообщений
У меня есть приложение Django, которое работает с Celery и использует RabbitMQ в качестве брокера сообщений. У меня есть отдельный проект для scrapy, откуда я собираю данные и хочу отправить эти данные в rabbitMQ, а затем django будет потреблять эти сообщения RabbitMQ через celery. Мне нужна помощь в потреблении сообщений, отправленных в rabbitMQ из проекта scrapy.
фрагмент кода.
scrapy
def process_item(self, item, spider):
publish_message(item)
return item
def publish_message(data):
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', port=5672))
channel = connection.channel()
channel.basic_publish(exchange='topic', routing_key='scrapy', body='Hello From
scrapy!')
connection.close()
В приложении django, consumers.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', heartbeat=600,
blocked_connection_timeout=300))
channel = connection.channel()
def callback(ch, method, properties, body):
print(" data =============== ", data)
# I will call celery task here once code print the data to make sure its running. unfortunately its not running. :(
return
channel.basic_consume(queue='scrapy', on_message_callback=callback, auto_ack=True)
print("Started Consuming...")
channel.start_consuming()
connection.close()
celery.py
from kombu import Exchange, Queue
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_project.settings.development')
celery_app = Celery('my_project', broker='amqp://guest:guest@rabbit:5672', backend='rpc://0.0.0.0:5672')
celery_app.config_from_object(f'django.conf:settings', namespace='CELERY')
celery_app.autodiscover_tasks()
celery_app.conf.update(
worker_max_tasks_per_child=1,
broker_pool_limit=None
)
default_exchange = Exchange('default', type='topic')
scrapy_exchange = Exchange('scrapy', type='topic')
celery_app.conf.task_queues = (
Queue('scrapy', scrapy_exchange, routing_key='scrapy.#'),
)
Вы не объявили очередь при потреблении. попробуйте это:
Издатель
def process_item(self, item, spider):
publish_message(item)
return item
def publish_message(data):
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', port=5672))
channel = connection.channel()
channel.exchange_declare(exchange='topic')
channel.basic_publish(exchange='topic', routing_key='scrapy', body='Hello From
scrapy!')
connection.close()
Потребитель
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', heartbeat=600,
blocked_connection_timeout=300))
channel = connection.channel()
def callback(ch, method, properties, body):
print(" data =============== ", body)
# I will call celery task here once code print the data to make sure its running. unfortunately its not running. :(
return
channel.exchange_declare(exchange='topic')
channel.queue_declare(queue='scrapy')
channel.queue_bind(exchange='topic', queue='scrapy', routing_key='scrapy')
channel.basic_consume(queue='scrapy', on_message_callback=callback, auto_ack=True)
print("Started Consuming...")
channel.start_consuming()
connection.close()