Задача Django Celery показывает мне "ошибку атрибута"?

Здравствуйте, я создаю веб-приложение django, которое получает данные из microsoft graph api и внутри моего django view.py я вызываю задачу 'add_api_data'

from django.shortcuts import render
from django.http import HttpResponse, HttpResponseRedirect
from django.urls import reverse
from datetime import datetime, timedelta
from dateutil import tz, parser
from tutorial.auth_helper import get_sign_in_flow, get_token_from_code, store_user, remove_user_and_token, get_token
from tutorial.graph_helper import *
from celery import shared_task
import time
from tutorial import task
import pandas as pd
from tutorial.models import demo,assignment_details

def education(request): 
  add_api_data = task.add_api_data.delay()
  api_list = assignment_details.objects.all().values()
  return render(request, 'tutorial/education.html', {'api_list':api_list,})


def home1(request):
  context = initialize_context(request)
  add_data = task.add_csv_data.delay()
  title1 = demo.objects.all().values()
  return render(request, 'tutorial/demo1.html',{'title1':title1,'context':context})

my auth_helper.py

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

# <FirstCodeSnippet>
import yaml
import msal
import os
import time

# Load the oauth_settings.yml file
stream = open('oauth_settings.yml', 'r')
settings = yaml.load(stream, yaml.SafeLoader)

def load_cache(request):
  # Check for a token cache in the session
  cache = msal.SerializableTokenCache()
  if request.session.get('token_cache'):
    cache.deserialize(request.session['token_cache'])

  return cache

def save_cache(request, cache):
  # If cache has changed, persist back to session
  if cache.has_state_changed:
    request.session['token_cache'] = cache.serialize()

def get_msal_app(cache=None):
  # Initialize the MSAL confidential client
  auth_app = msal.ConfidentialClientApplication(
    settings['app_id'],
    authority=settings['authority'],
    client_credential=settings['app_secret'],
    token_cache=cache)

  return auth_app

# Method to generate a sign-in flow
def get_sign_in_flow():
  auth_app = get_msal_app()

  return auth_app.initiate_auth_code_flow(
    settings['scopes'],
    redirect_uri=settings['redirect'])

# Method to exchange auth code for access token
def get_token_from_code(request):
  cache = load_cache(request)
  auth_app = get_msal_app(cache)

  # Get the flow saved in session
  flow = request.session.pop('auth_flow', {})

  result = auth_app.acquire_token_by_auth_code_flow(flow, request.GET)
  save_cache(request, cache)

  return result
# </FirstCodeSnippet>

# <SecondCodeSnippet>
def store_user(request, user):
  try:
    request.session['user'] = {
      'is_authenticated': True,
      'name': user['displayName'],
      'email': user['mail'] if (user['mail'] != None) else user['userPrincipalName'],
      'timeZone': user['mailboxSettings']['timeZone'] if ('timeZone' in user['mailboxSettings']) else 'UTC'
    }
  except Exception as e:
    print(e)

def get_token(request):
  cache = load_cache(request)
  auth_app = get_msal_app(cache)

  accounts = auth_app.get_accounts()
  if accounts:
    result = auth_app.acquire_token_silent(
      settings['scopes'],
      account=accounts[0])

    save_cache(request, cache)

    return result['access_token']

def remove_user_and_token(request):
  if 'token_cache' in request.session:
    del request.session['token_cache']

  if 'user' in request.session:
    del request.session['user']
# </SecondCodeSnippet>

В task.py я написал две задачи

функция задачи add_csv_data выполняется успешно на сервере celery без ошибок

но когда я вызываю функцию задачи add_api_data, она выдает ошибку атрибута

Traceback (most recent call last):
  File "C:\Users\Ajay\AppData\Local\Programs\Python\Python310\lib\site-packages\celery\app\trace.py", line 451, in trace_task
    R = retval = fun(*args, **kwargs)
  File "C:\Users\Ajay\AppData\Local\Programs\Python\Python310\lib\site-packages\celery\app\trace.py", line 734, in __protected_call__
    return self.run(*args, **kwargs)
  File "F:\django_template\grapapi\graph_tutorial\tutorial\task.py", line 58, in add_api_data
    token = get_token(request)
  File "F:\django_template\grapapi\graph_tutorial\tutorial\auth_helper.py", line 72, in get_token
    cache = load_cache(request)
  File "F:\django_template\grapapi\graph_tutorial\tutorial\auth_helper.py", line 17, in load_cache
    if request.session.get('token_cache'):
AttributeError: 'add_api_data' object has no attribute 'session'
Вернуться на верх