Реализация пользовательской аутентификации в DRF, которая может читать request.data
У меня есть внешний ключ на моих моделях, таких как Patient, и Doctor, которые указывают на класс Clinic. Таким образом, пациент и врач должны принадлежать только этой клинике. Другие клиники не должны иметь возможности видеть детали этих моделей.
Из моего приложения Vue я буду отправлять сообщения с помощью Axios в приложение django, которое использует DRF, и таким образом получать сериализованные данные о пациентах и врачах. Все работает нормально, если я попытаюсь использовать следующий пример кода в функции view:
@api_view(['GET', 'POST'])
def register_patient_vue(request):
if request.method == 'POST':
print("POST details", request.data)
data = request.data['registration_data']
serializer = customerSpecialSerializer(data=data)
if serializer.is_valid():
a = serializer.save()
print(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED)
else:
print("Serializer is notNot valid.")
print(serializer.errors)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
Вывод образца:
POST details {'registration_data': {'name': 'wczz', 'ageyrs': 21, 'agemonths': '', 'dob': '', 'gender': 'unspecified', 'mobile': '2', 'email': '', 'alternate': '', 'address': '', 'marital': 'unspecified', 'city': '', 'occupation': '', 'linkedclinic': 10}}
data: {'name': 'wczz', 'ageyrs': 21, 'agemonths': '', 'dob': '', 'gender': 'unspecified', 'mobile': '2', 'email': '', 'alternate': '', 'address': '', 'marital': 'unspecified', 'city': '', 'occupation': '', 'linkedclinic': 10}
Однако мне нужно аутентифицировать запрос с помощью специальной пользовательской аутентификации. У меня есть другой класс UserGroupMap, который имеет внешние ключи для пользователя и клиники, так что если есть совпадение фильтра для клиники и пользователя в карте, он будет аутентифицирован. В противном случае аутентификация должна быть провалена, и данные не должны быть получены или сохранены сериализатором.
В моем предыдущем простом чистом проекте django я использовал пользовательскую функцию разрешения, и украшал ею мое представление:
@handle_perm(has_permission_level, required_permission='EDIT_CLINICAL_RECORD', login_url='/clinic/')
def some_function(request, dept_id):
....
Some code which runs after authentication
И он будет использовать следующее:
def handle_perm(test_func, required_permission=None, login_url=None, redirect_field_name=REDIRECT_FIELD_NAME):
"""
Decorator for views that checks that the user passes the given test,
redirecting to the log-in page if necessary. The test should be a callable
that takes the user object and returns True if the user passes.
"""
def decorator(view_func):
@wraps(view_func)
def _wrapped_view(request, *args, **kwargs):
print(f"Required permission level is {required_permission}")
if has_permission_level(request, required_permission):
print("User has required permission level..Allowing entry.")
return view_func(request, *args, **kwargs)
print("FAILED! User does not have required permission level. Access blocked.")
path = request.build_absolute_uri()
resolved_login_url = resolve_url(login_url or settings.LOGIN_URL)
# If the login url is the same scheme and net location then just
# use the path as the "next" url.
login_scheme, login_netloc = urlparse(resolved_login_url)[:2]
current_scheme, current_netloc = urlparse(path)[:2]
if ((not login_scheme or login_scheme == current_scheme) and
(not login_netloc or login_netloc == current_netloc)):
path = request.get_full_path()
from django.contrib.auth.views import redirect_to_login
return redirect_to_login(
path, resolved_login_url, redirect_field_name)
return _wrapped_view
return decorator
def has_permission_level(request, required_permission, clinic=None):
print("has_permission_level was called.")
user = request.user
print(f'user is {user}')
clinic=clinic_from_request(request)
print(f"has_permission_level called with clinic:{clinic}")
if clinic is None:
print("clinic is none")
return HttpResponseRedirect('/accounts/login/')
group_maps = UserGroupMap.objects.filter(user=user, clinic=clinic)
print(f"No: of UserGroupMap memberships: {len(group_maps)}")
if len(group_maps) < 1:
# There are no UserGroupMap setup for the user. Kindly set them up.\nHint:Admin>Manage users and groups>Users
return False
# Now checking Group memberships whether the user has any with permisison
for map in group_maps:
rolesmapped = GroupRoleMap.objects.filter(group=map.group)
if len(rolesmapped) < 1:
print(f"No permission roles.")
else:
for rolemap in rolesmapped:
print(f"{rolemap.role}", end=",")
if rolemap.role.name == required_permission:
print(
f"\nAvailable role of [{map.group}] matched required permission of [{required_permission}] in {clinic.name} [Ok]")
return True
return False
Мне нужно создать пользовательскую аутентификацию с использованием DRF, чтобы она читала POSTed данные, проверяла значение linkedclinic и использовала подобную логику.
Я начал так:
def has_permission_POST(request, required_permission, clinic=None):
print("has_permission_POST was called.")
user = request.user
print(f'user is {user}')
if request.method == 'POST':
print(request)
print(dir(request))
print("POST details: POST:", request.POST, "\n")
print("POST details: data:", request.data, "\n")
....
# Further logic to check the mapping
return True
else:
print("Not a valid POST")
return Response("INVALID POST", status=status.HTTP_400_BAD_REQUEST)
# And decorating my DRF view:
@handle_perm(has_permission_POST, required_permission='EDIT_CLINICAL_RECORD', login_url='/clinic/')
@api_view(['GET', 'POST'])
def register_patient_vue(request):
if request.method == 'POST':
print("POST details", request.data)
data = request.data['registration_data']
Проблема в том, что если я запускаю это, то has_permission_POST не может получить значение request.data, которое содержит данные, опубликованные из моего фронтенда. Я могу обойти это, добавив декоратор @api_view(['GET', 'POST'])
к has_permission_POST. Но это приводит к другой ошибке, неудачному утверждению:
AssertionError: Expected a `Response`, `HttpResponse` or `HttpStreamingResponse` to be returned from the view, but received a `<class 'bool'>`
Это происходит из has_permission_POST, когда он украшен @api_view.
Итак, мои проблемы:
- How to implement a custom authentication for my use case?
- If I am going about this right, by using this custom has_permission_level, how can I get the request.data in this function before my actual api view is called, so that I can read the clinic id and do the checks for permission that I need.
Я рассмотрел CustomAuthentication, предоставляемый DRF, но не смог найти, как получить параметры request.data в пользовательском классе.
Благодаря @MihaiChelaru, я смог найти решение своей проблемы.
Я создал пользовательский класс Permission, расширив permissions.BasePermission и использовав свою логику в специальной функции has_permission
. Я пошел дальше и реализовал проверку Token из запроса. Как только токен аутентифицирован, пользователь может быть получен по соответствующему токену из таблицы Token. Я обнаружил, что в классе пользовательского разрешения я могу прочитать полный параметр request.data
, передаваемый Vue и Postman. Прочитав его, я смог легко реализовать пользовательскую проверку разрешений пользователя, которую имели мои пользовательские модели
class CustomerAccessPermission(permissions.BasePermission):
message = 'No permission to create new patient records'
def has_permission(self, request, view):
bearer_authorizn = request.META.get('HTTP_AUTHORIZATION')
try: #Different apps like POSTMAN, and Vue seem to use different strings while passing token
token = bearer_authorizn.split("Bearer ")[1]
except Exception as e:
try:
token = bearer_authorizn.split("Token ")[1]
except Exception as e:
raise NotAuthenticated('Did not get token in request')
try:
token_obj = Token.objects.get(key=token)
except self.model.DoesNotExist:
raise AuthenticationFailed('Invalid token')
if not token_obj.user.is_active:
raise AuthenticationFailed('User inactive or deleted')
print("Username is %s" % token_obj.user.username)
print("POST details", request.data)
linkedclinic_id = request.data['data']['linkedclinic']
clinic = Clinic.objects.get(clinicid=int(linkedclinic_id))
print("Clinic membership requested:", clinic)
group_maps = UserGroupMap.objects.filter(user=user, clinic=clinic)
print(f"No: of UserGroupMap memberships: {len(group_maps)}")
if len(group_maps) > 1:
return True
return False
@api_view(['POST'])
@permission_classes([CustomerAccessPermission])
def register_patient_vue(request):
logger.info('In register_patient_vue...')
...