56 lines
1.9 KiB
Python
56 lines
1.9 KiB
Python
from rest_framework.authentication import BaseAuthentication
|
|
from rest_framework.exceptions import AuthenticationFailed
|
|
from django.http import HttpRequest
|
|
from typing import Optional, Tuple
|
|
|
|
from .utils import (
|
|
verify_jwt_token,
|
|
get_user_from_jwt_claims,
|
|
refresh_access_token
|
|
)
|
|
|
|
class HopIDAuthentication(BaseAuthentication):
|
|
def authenticate(self, request: HttpRequest) -> Optional[Tuple[object, str]]:
|
|
refresh_token = None
|
|
is_session_based = False
|
|
|
|
auth_header = request.headers.get('Authorization', '')
|
|
if auth_header.lower().startswith('bearer '):
|
|
access_token = auth_header.split(' ', 1)[1].strip()
|
|
else:
|
|
access_token = request.session.get('access_token')
|
|
refresh_token = request.session.get('refresh_token')
|
|
is_session_based = True
|
|
|
|
user = None
|
|
if access_token:
|
|
try:
|
|
claims = verify_jwt_token(access_token)
|
|
user = get_user_from_jwt_claims(claims, access_token)
|
|
except Exception as e:
|
|
print(e)
|
|
user = None
|
|
|
|
# Try refreshing session-based tokens
|
|
if not user and is_session_based and refresh_token:
|
|
tokens = refresh_access_token(refresh_token)
|
|
print(tokens)
|
|
if tokens:
|
|
access_token = tokens.get('access_token')
|
|
refresh_token = tokens.get('refresh_token')
|
|
|
|
request.session['access_token'] = access_token
|
|
request.session['refresh_token'] = refresh_token
|
|
|
|
try:
|
|
claims = verify_jwt_token(access_token)
|
|
user = get_user_from_jwt_claims(claims, access_token)
|
|
except Exception as e:
|
|
print(e)
|
|
user = None
|
|
|
|
if not user:
|
|
raise AuthenticationFailed('Invalid or expired token.')
|
|
|
|
return user, access_token
|