Токен-аутентификация в django (rest_framework) не работает в производственной среде, но работает на локальной машине

Когда я тестирую свое представление с правильным токеном на локальной машине, оно работает правильно, но когда я тестирую представление в производственной среде с правильным токеном, я получаю { "detail": "Учетные данные для аутентификации не были предоставлены". } . Что может быть причиной этой ошибки в моей производственной среде?

views.py

class AdminUserCreateView(APIView):
    permission_classes = [IsAuthenticated]
    authentication_classes = [AdminTokenAuthentication]

    @swagger_auto_schema(
        request_body=CreateAdminSerializer,
        responses={
            201: "AdminUser created successfully",
            400: "Validation failed",
        },
    )
    def post(self, request, *args, **kwargs):
        serializer = CreateAdminSerializer(data=request.data)
        if serializer.is_valid():
            email = serializer.validated_data.get("email")
            password = serializer.validated_data.get("password")

            try:
                # Check if the email already exists
                if AdminUser.objects.filter(email=email).exists():
                    return Response(
                        {"status": "error", "message": "Email already exists"},
                        status=status.HTTP_400_BAD_REQUEST,
                    )

                # Hash the password
                hashed_password = make_password(password)

                # Create the AdminUser instance
                AdminUser.objects.create(email=email, password=hashed_password)
                return Response(
                    {"status": "success", "message": "AdminUser created successfully"},
                    status=status.HTTP_201_CREATED,
                )
            except IntegrityError as e:
                return Response(
                    {"status": "error", "message": "Database integrity error occurred"},
                    status=status.HTTP_500_INTERNAL_SERVER_ERROR,
                )
        else:
            return Response(
                {
                    "status": "error",
                    "message": "Validation failed",
                    "errors": serializer.errors,
                },
                status=status.HTTP_400_BAD_REQUEST,
            )

middleware.py

class AdminTokenAuthenticationMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        # Check if the request path matches the endpoints that require admin token authentication
        if request.path.startswith("/admin"):
            return self.process_admin_request(request)

        # For other endpoints, proceed with the request
        return self.get_response(request)

    def process_admin_request(self, request):
        auth_header = request.headers.get("Authorization")
        if auth_header and "Bearer" in auth_header:
            try:
                token_key = auth_header.split(" ")[1]
                user = AdminUser.objects.get(auth_token=token_key)
                request.user = user
                return self.get_response(request)  # Proceed with the request
            except AdminUser.DoesNotExist:
                return JsonResponse({"error": "User not found god"}, status=401)
        return JsonResponse(
            {"error": "No authentication credentials provided"}, status=401
        )


# Middleware to authentication Users
class TokenAuthenticationMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        # Get the authorization header from the request
        authorization_header = request.headers.get("Authorization")

        # If authorization header is present and it's a Bearer token
        if authorization_header and "Bearer" in authorization_header:
            # Get the access token from the authorization header
            access_token = authorization_header.split()[1]

            try:
                # Decode the access token
                decoded_token = AccessToken(access_token).payload
                user_id = decoded_token["user_id"]

                # Implement logic to retrieve the user based on user_id
                user = User.objects.get(id=user_id)

                # Attach the user to the request for later use
                request.user = user

                # Proceed with the request
                return self.get_response(request)

            except (InvalidToken, TokenError):
                return JsonResponse({"error": "Invalid or expired token"}, status=401)
            except User.DoesNotExist:
                # If user is not found, proceed with AdminTokenAuthenticationMiddleware
                return AdminTokenAuthenticationMiddleware(self.get_response)(request)

        # If no valid authorization header or Bearer token, proceed with the request
        return self.get_response(request)

authentication.py

class UserAuthentication(BaseAuthentication):
    def authenticate(self, request):
        # Get the authorization header from the request
        authorization_header = request.headers.get("Authorization")

        if not authorization_header or "Bearer" not in authorization_header:
            return None

        # Get the access token from the authorization header
        access_token = authorization_header.split()[1]

        try:
            # Decode the access token
            decoded_token = AccessToken(access_token).payload
            user_id = decoded_token["user_id"]

            # Retrieve the user based on user_id
            user = User.objects.get(id=user_id)

            # Return a tuple of (user, None) to indicate successful authentication
            return user, None

        except InvalidToken:
            raise AuthenticationFailed("Invalid access token")
        except TokenError:
            raise AuthenticationFailed("Token is invalid or expired")
        except User.DoesNotExist:
            raise AuthenticationFailed("User not found")


class AdminTokenAuthentication(BaseAuthentication):
    def authenticate(self, request):
        auth_header = request.headers.get("Authorization")
        if auth_header and "Bearer" in auth_header:
            try:
                token_key = auth_header.split(" ")[1]
                user = AdminUser.objects.get(auth_token=token_key)
                request.user = user
                return (user, None)  # Authentication successful
            except AdminUser.DoesNotExist:
                raise AuthenticationFailed("User not found")
        return None  # No authentication credentials provided

Вернуться на верх