Celery запрещает использование словарного типа данных

Я определил shared_task с celery следующим образом:

@shared_task(bind=True)
def create_parameters_for_task_creation(project_type, dataset_instance_ids, filter_string, sampling_mode, sampling_parameters, variable_parameters, project_id) -> None: 
    """Function to create the paramters for the task creation process. The function is passed arguments from the frontend which decide how the sentences have to be filtered and sampled. 

    Args:
        project_type (str): Describes the type of project passed by the user
        dataset_instance_ids (int): ID of the dataset that has been provided for the annotation task 
        filter_string (str): _description_
        sampling_mode (str): Method of sampling
        sampling_parameters (dict): Parameters for sampling
        variable_parameters (dict): _description_
        project_id (int): ID of the project object created in this iteration

    """

Как только я пытаюсь запустить celery workers - celery -A shoonya_backend.celery worker -l info, я получаю следующие ошибки.

Unrecoverable error: TypeError("cannot pickle 'dict_keys' object")

Я считаю, что Celery не позволяет мне передавать тип данных dictionary, что странно, потому что мои настройки позволяют передавать тип данных json.

CELERY_BROKER_URL = 'redis://localhost:6380'
result_backend = 'redis://localhost:6380'
accept_content = 'json'
result_serializer = 'json'
task_serializer = 'json'

Что следует сделать?

Вот как я решил эту проблему, изменив celery.py файл.

from __future__ import absolute_import, unicode_literals

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'shoonya_backend.settings')

# Define celery app and settings
celery_app = Celery('shoonya_backend',
                    result_backend = 'django-db',
                    accept_content = ['application/json'],
                    result_serializer = 'json',
                    task_serializer = 'json')

celery_app.config_from_object('django.conf:settings', namespace='CELERY')
celery_app.autodiscover_tasks()

@celery_app.task(bind=True)
def debug_task(self):
    ''' First task for task handling testing and to apply migrations to the celery results db '''
    print(f'Request: {self.request!r}') 
Вернуться на верх