Исходный код celery.backends.database.models

"""Database models used by the SQLAlchemy result store backend."""
from datetime import datetime

import sqlalchemy as sa
from sqlalchemy.types import PickleType

from celery import states

from .session import ResultModelBase

__all__ = ('Task', 'TaskExtended', 'TaskSet')


[документация]class Task(ResultModelBase): """Task result/status.""" __tablename__ = 'celery_taskmeta' __table_args__ = {'sqlite_autoincrement': True} id = sa.Column(sa.Integer, sa.Sequence('task_id_sequence'), primary_key=True, autoincrement=True) task_id = sa.Column(sa.String(155), unique=True) status = sa.Column(sa.String(50), default=states.PENDING) result = sa.Column(PickleType, nullable=True) date_done = sa.Column(sa.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=True) traceback = sa.Column(sa.Text, nullable=True) def __init__(self, task_id): self.task_id = task_id
[документация] def to_dict(self): return { 'task_id': self.task_id, 'status': self.status, 'result': self.result, 'traceback': self.traceback, 'date_done': self.date_done, }
def __repr__(self): return '<Task {0.task_id} state: {0.status}>'.format(self)
[документация] @classmethod def configure(cls, schema=None, name=None): cls.__table__.schema = schema cls.id.default.schema = schema cls.__table__.name = name or cls.__tablename__
[документация]class TaskExtended(Task): """For the extend result.""" __tablename__ = 'celery_taskmeta' __table_args__ = {'sqlite_autoincrement': True, 'extend_existing': True} name = sa.Column(sa.String(155), nullable=True) args = sa.Column(sa.LargeBinary, nullable=True) kwargs = sa.Column(sa.LargeBinary, nullable=True) worker = sa.Column(sa.String(155), nullable=True) retries = sa.Column(sa.Integer, nullable=True) queue = sa.Column(sa.String(155), nullable=True)
[документация] def to_dict(self): task_dict = super().to_dict() task_dict.update({ 'name': self.name, 'args': self.args, 'kwargs': self.kwargs, 'worker': self.worker, 'retries': self.retries, 'queue': self.queue, }) return task_dict
[документация]class TaskSet(ResultModelBase): """TaskSet result.""" __tablename__ = 'celery_tasksetmeta' __table_args__ = {'sqlite_autoincrement': True} id = sa.Column(sa.Integer, sa.Sequence('taskset_id_sequence'), autoincrement=True, primary_key=True) taskset_id = sa.Column(sa.String(155), unique=True) result = sa.Column(PickleType, nullable=True) date_done = sa.Column(sa.DateTime, default=datetime.utcnow, nullable=True) def __init__(self, taskset_id, result): self.taskset_id = taskset_id self.result = result
[документация] def to_dict(self): return { 'taskset_id': self.taskset_id, 'result': self.result, 'date_done': self.date_done, }
def __repr__(self): return f'<TaskSet: {self.taskset_id}>'
[документация] @classmethod def configure(cls, schema=None, name=None): cls.__table__.schema = schema cls.id.default.schema = schema cls.__table__.name = name or cls.__tablename__
Вернуться на верх