Задача Django Celery показывает мне "ошибку атрибута"?
Здравствуйте, я создаю веб-приложение django, которое получает данные из microsoft graph api и внутри моего django view.py я вызываю задачу 'add_api_data'
from django.shortcuts import render
from django.http import HttpResponse, HttpResponseRedirect
from django.urls import reverse
from datetime import datetime, timedelta
from dateutil import tz, parser
from tutorial.auth_helper import get_sign_in_flow, get_token_from_code, store_user, remove_user_and_token, get_token
from tutorial.graph_helper import *
from celery import shared_task
import time
from tutorial import task
import pandas as pd
from tutorial.models import demo,assignment_details
def education(request):
add_api_data = task.add_api_data.delay()
api_list = assignment_details.objects.all().values()
return render(request, 'tutorial/education.html', {'api_list':api_list,})
def home1(request):
context = initialize_context(request)
add_data = task.add_csv_data.delay()
title1 = demo.objects.all().values()
return render(request, 'tutorial/demo1.html',{'title1':title1,'context':context})
my auth_helper.py
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# <FirstCodeSnippet>
import yaml
import msal
import os
import time
# Load the oauth_settings.yml file
stream = open('oauth_settings.yml', 'r')
settings = yaml.load(stream, yaml.SafeLoader)
def load_cache(request):
# Check for a token cache in the session
cache = msal.SerializableTokenCache()
if request.session.get('token_cache'):
cache.deserialize(request.session['token_cache'])
return cache
def save_cache(request, cache):
# If cache has changed, persist back to session
if cache.has_state_changed:
request.session['token_cache'] = cache.serialize()
def get_msal_app(cache=None):
# Initialize the MSAL confidential client
auth_app = msal.ConfidentialClientApplication(
settings['app_id'],
authority=settings['authority'],
client_credential=settings['app_secret'],
token_cache=cache)
return auth_app
# Method to generate a sign-in flow
def get_sign_in_flow():
auth_app = get_msal_app()
return auth_app.initiate_auth_code_flow(
settings['scopes'],
redirect_uri=settings['redirect'])
# Method to exchange auth code for access token
def get_token_from_code(request):
cache = load_cache(request)
auth_app = get_msal_app(cache)
# Get the flow saved in session
flow = request.session.pop('auth_flow', {})
result = auth_app.acquire_token_by_auth_code_flow(flow, request.GET)
save_cache(request, cache)
return result
# </FirstCodeSnippet>
# <SecondCodeSnippet>
def store_user(request, user):
try:
request.session['user'] = {
'is_authenticated': True,
'name': user['displayName'],
'email': user['mail'] if (user['mail'] != None) else user['userPrincipalName'],
'timeZone': user['mailboxSettings']['timeZone'] if ('timeZone' in user['mailboxSettings']) else 'UTC'
}
except Exception as e:
print(e)
def get_token(request):
cache = load_cache(request)
auth_app = get_msal_app(cache)
accounts = auth_app.get_accounts()
if accounts:
result = auth_app.acquire_token_silent(
settings['scopes'],
account=accounts[0])
save_cache(request, cache)
return result['access_token']
def remove_user_and_token(request):
if 'token_cache' in request.session:
del request.session['token_cache']
if 'user' in request.session:
del request.session['user']
# </SecondCodeSnippet>
В task.py я написал две задачи
функция задачи add_csv_data выполняется успешно на сервере celery без ошибок
но когда я вызываю функцию задачи add_api_data, она выдает ошибку атрибута
Traceback (most recent call last):
File "C:\Users\Ajay\AppData\Local\Programs\Python\Python310\lib\site-packages\celery\app\trace.py", line 451, in trace_task
R = retval = fun(*args, **kwargs)
File "C:\Users\Ajay\AppData\Local\Programs\Python\Python310\lib\site-packages\celery\app\trace.py", line 734, in __protected_call__
return self.run(*args, **kwargs)
File "F:\django_template\grapapi\graph_tutorial\tutorial\task.py", line 58, in add_api_data
token = get_token(request)
File "F:\django_template\grapapi\graph_tutorial\tutorial\auth_helper.py", line 72, in get_token
cache = load_cache(request)
File "F:\django_template\grapapi\graph_tutorial\tutorial\auth_helper.py", line 17, in load_cache
if request.session.get('token_cache'):
AttributeError: 'add_api_data' object has no attribute 'session'