Переопределение класса Celery Task

Пытаюсь реализовать задачу, в которой глобальные переменные разделяются между двумя разными задачами Celery. Для этого я унаследовал класс задачи и использовал свойство. Согласно документации по Celery, базовый класс инициализируется при вызове новой задачи. Есть ли у нас подход, позволяющий повторно использовать объект между задачами? Можем ли мы переопределить метод run() из Task? Если мы переопределим метод run. Как мы можем зарегистрировать задачу в celery, используя Celery 5. X ? Пробовал сериализовать объект... Любой альтернативный подход будет оценен по достоинству.

class handler(Task):

def __init__(self):
    self.base_obj = ""

@property
def global_handler(self):
    return self.global_thread_handler

@property
def base_handler(self):
    return self.base_obj


@app.task(base=handler)
def test123():
    test123.base_handler = cls1()


@app.task(base=handler)
def test456():
    test456.base_handler.method()

Регистрацию задачи в Celery можно просто выполнить, используя что-то вроде этого: # my_app/tasks.py import celery

from my_app.celery import app

class MyTask(celery.Task): def run(self): [...]

MyTask = app.register_task(MyTask()))

Я думаю, что нет никакого способа повторного использования объектов внутри задач. Кто-нибудь может меня поправить?

Celery использует несколько процессов для асинхронного выполнения задач. Независимо от того, какой класс Task вы будете использовать, он будет виден только внутри рабочего процесса. Если вы хотите обмениваться переменными между процессами, вам придется использовать multiprocessing специфические объекты, чтобы иметь возможность обмениваться значениями вне одного процесса.:

from celery import Celery, Task
from multiprocessing import Queue

BROKER_URL = 'sqla+sqlite:///celery.db'

app = Celery(
    'tasks',
    broker='sqla+sqlite:///celery.db',
    backend='db+sqlite:///celery_results.db'
)

q = Queue()

@app.task
def set_var(val: str):
    # put is appending new value to Queue every time when we call set_var
    q.put(val)
    print(f'Global variable has been changed to {val}')


@app.task
def get_var():
    if not q.empty():
        # get is popping first value from Queue if Queue is not empty
        val = q.get()
        print(f'Global variable is {val}')
        return val
    else:
        print('Global variable is EMPTY')

Итак, если мы выполним:

>>> from tasks import set_var, get_var
>>> get_var.delay()
<AsyncResult: 387d02a2-a993-4ea4-9dc6-37b4a6657f92>
>>> set_var.delay('NEW VALUE')
<AsyncResult: 8b02519f-cd87-4e88-9270-2c621f84b14e>
>>> get_var.delay()
<AsyncResult: 828047b1-5f3a-4980-b922-224d0cfb628d>

Журналы будут выглядеть следующим образом:

[2023-03-25 20:38:29,670: INFO/MainProcess] Task tasks.get_var[387d02a2-a993-4ea4-9dc6-37b4a6657f92] received
[2023-03-25 20:38:29,674: WARNING/ForkPoolWorker-1] Global variable is EMPTY
[2023-03-25 20:38:29,723: INFO/ForkPoolWorker-1] Task tasks.get_var[387d02a2-a993-4ea4-9dc6-37b4a6657f92] succeeded in 0.049737373999960255s: None
[2023-03-25 20:38:45,801: INFO/MainProcess] Task tasks.set_var[8b02519f-cd87-4e88-9270-2c621f84b14e] received
[2023-03-25 20:38:45,802: WARNING/ForkPoolWorker-2] Global variable has been changed to NEW VALUE
[2023-03-25 20:38:45,822: INFO/ForkPoolWorker-2] Task tasks.set_var[8b02519f-cd87-4e88-9270-2c621f84b14e] succeeded in 0.02061020500059385s: None
[2023-03-25 20:38:49,846: INFO/MainProcess] Task tasks.get_var[828047b1-5f3a-4980-b922-224d0cfb628d] received
[2023-03-25 20:38:49,848: WARNING/ForkPoolWorker-3] Global variable is NEW VALUE
[2023-03-25 20:38:49,870: INFO/ForkPoolWorker-3] Task tasks.get_var[828047b1-5f3a-4980-b922-224d0cfb628d] succeeded in 0.022652251000181423s: 'NEW VALUE'

Как видите, ForkPoolWorker-3 способен прочитать значение, которое было установлено внутри ForkPoolWorker-2

Вы должны быть осторожны с общими переменными из-за блокировок и проблем с параллелизмом. Я рекомендую просто использовать БД для хранения значений, которые должны быть доступны из разных источников

Вернуться на верх