How create task with celery in django ? POO issue?

I'm trying to set up a task with Django and Celery. The Celery and Django configuration is okay, nothing to report on that side. However, I have a problem, I think with the writing, of my code in OOP. I can't locate where the problem is. It's an argument problem, 4 arguments are expected by delay(), but my method only expects 3. Here are the files and what I tried to do, as well as the error trace.

My files

i want create a task for run_test_optimization method

class MachineLearniaXHelpers(MagicModels):
    def __init__(self, data_dict, target_name, type_model, test_size = 0.33, random_state = 42, **kwargs) -> None:
        self.test_size = test_size
        self.random_state = random_state
        self.process = DataProcessor(
        self.metrics = MetricsScorer(type_model=type_model)
        self.model = self.several_algorythme(type_model, **kwargs)
    def split_data(data_dict):
        return train_test_split(*data_dict)
    def train_model(model, X, y):, y)
    def run_test_optimization(self):
        dict_result = []
        for model in self.model:
            feature_model, values = self.process.transform_dict_to_array_structure()
            x_train, x_test, y_train, y_test = self.split_data(values)
            self.train_model(model, x_train, y_train)
            dict_metrics_train = self.metrics.choice_metrics_by_type_model(y_train, model.predict(x_train))
            dict_metrics_test = self.metrics.choice_metrics_by_type_model(y_test, model.predict(x_test))
                "name_model" : model.__class__.__name__,
                "features_model" : feature_model,
                "train_performances" : dict_metrics_train,
                "test_performances" : dict_metrics_test
        return dict_result

create my task task_run_test_optimization

from celery import shared_task
from .helpers import MachineLearniaXHelpers

def task_run_test_optimization(data_dict, target_name, type_model):
    constructor = MachineLearniaXHelpers(
    dict_result = constructor.run_test_optimization()
    return dict_result

API post method, the task method is here task_run_test_optimization

class MachineLearningXView(APIView):
    serializer_class = BuildModelMLSerializer
    permission_classes = [IsAuthenticated, UserPermissionMachineLearniaX]

    def post(self, request):
        if not self.request.session.exists(self.request.session.session_key):
        serializer = self.serializer_class(
        if serializer.is_valid():
            data ='data')
            target ='target_name')
            type_model ='test_type')
            result = task_run_test_optimization.delay(data, target, type_model) # probleme is here
            return Response(status=status.HTTP_200_OK)

result send me an id in my console, the post request is ok:

System check identified no issues (0 silenced).
February 08, 2023 - 17:25:28
Django version 4.1.5, using settings 'ialab.settings'
Starting development server at
Quit the server with CTRL-BREAK.
829e66ee-925f-4aff-8c42-49420e5758ce <-- PRINT result IS HERE
[08/Feb/2023 17:25:47] "POST /api/machinelearniax HTTP/1.1" 200 0

But in my celery worker, i'have this error:

[2023-02-08 17:25:47,813: INFO/MainProcess] Task api.tasks.task_run_test_optimization[829e66ee-925f-4aff-8c42-49420e5758ce] received
[2023-02-08 17:25:47,897: ERROR/MainProcess] Task api.tasks.task_run_test_optimization[829e66ee-925f-4aff-8c42-49420e5758ce] raised unexpected: TypeError('task_run_test_optimization() takes 3 positional arguments but 4 were given')
Traceback (most recent call last):
  File "C:\Users\basti\Desktop\IALab\ialab\.ialabenv\lib\site-packages\celery\app\", line 451, in trace_task
    R = retval = fun(*args, **kwargs)
  File "C:\Users\basti\Desktop\IALab\ialab\.ialabenv\lib\site-packages\celery\app\", line 734, in __protected_call__
    return*args, **kwargs)
TypeError: task_run_test_optimization() takes 3 positional arguments but 4 were given

the task is received but not working.

where is my error ?


Possible case and solution #1.

You might have had task_run_test_optimization accepting 4 arguments initially. You've run Celery and it loaded this task and its signature into the memory.

Then you might have changed task_run_test_optimization signature to take 3 arguments, but you forgot to reload Celery. As a result, you are trying to call the task with 3 arguments, but Celery still keeps the old version of the task (task with 4 arguments).

Possible case and solution #2:

Maybe you have 2 different tasks named the same: "task_run_test_optimization" and your imports the wrong one? Make sure you import the right one.

Back to Top