"""Database models used by the SQLAlchemy result store backend."""
from datetime import datetime
import sqlalchemy as sa
from sqlalchemy.types import PickleType
from celery import states
from .session import ResultModelBase
__all__ = ('Task', 'TaskExtended', 'TaskSet')
[документация]class Task(ResultModelBase):
"""Task result/status."""
__tablename__ = 'celery_taskmeta'
__table_args__ = {'sqlite_autoincrement': True}
id = sa.Column(sa.Integer, sa.Sequence('task_id_sequence'),
primary_key=True, autoincrement=True)
task_id = sa.Column(sa.String(155), unique=True)
status = sa.Column(sa.String(50), default=states.PENDING)
result = sa.Column(PickleType, nullable=True)
date_done = sa.Column(sa.DateTime, default=datetime.utcnow,
onupdate=datetime.utcnow, nullable=True)
traceback = sa.Column(sa.Text, nullable=True)
def __init__(self, task_id):
self.task_id = task_id
[документация] def to_dict(self):
return {
'task_id': self.task_id,
'status': self.status,
'result': self.result,
'traceback': self.traceback,
'date_done': self.date_done,
}
def __repr__(self):
return '<Task {0.task_id} state: {0.status}>'.format(self)
[документация]class TaskExtended(Task):
"""For the extend result."""
__tablename__ = 'celery_taskmeta'
__table_args__ = {'sqlite_autoincrement': True, 'extend_existing': True}
name = sa.Column(sa.String(155), nullable=True)
args = sa.Column(sa.LargeBinary, nullable=True)
kwargs = sa.Column(sa.LargeBinary, nullable=True)
worker = sa.Column(sa.String(155), nullable=True)
retries = sa.Column(sa.Integer, nullable=True)
queue = sa.Column(sa.String(155), nullable=True)
[документация] def to_dict(self):
task_dict = super().to_dict()
task_dict.update({
'name': self.name,
'args': self.args,
'kwargs': self.kwargs,
'worker': self.worker,
'retries': self.retries,
'queue': self.queue,
})
return task_dict
[документация]class TaskSet(ResultModelBase):
"""TaskSet result."""
__tablename__ = 'celery_tasksetmeta'
__table_args__ = {'sqlite_autoincrement': True}
id = sa.Column(sa.Integer, sa.Sequence('taskset_id_sequence'),
autoincrement=True, primary_key=True)
taskset_id = sa.Column(sa.String(155), unique=True)
result = sa.Column(PickleType, nullable=True)
date_done = sa.Column(sa.DateTime, default=datetime.utcnow,
nullable=True)
def __init__(self, taskset_id, result):
self.taskset_id = taskset_id
self.result = result
[документация] def to_dict(self):
return {
'taskset_id': self.taskset_id,
'result': self.result,
'date_done': self.date_done,
}
def __repr__(self):
return f'<TaskSet: {self.taskset_id}>'