Added new features
This commit is contained in:
parent
f59a7cd3b0
commit
a75d135589
52
django_hopid/auth.py
Normal file
52
django_hopid/auth.py
Normal file
@ -0,0 +1,52 @@
|
||||
from rest_framework.authentication import BaseAuthentication
|
||||
from rest_framework.exceptions import AuthenticationFailed
|
||||
from django.http import HttpRequest
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from .utils import (
|
||||
verify_jwt_token,
|
||||
get_user_from_jwt_claims,
|
||||
refresh_access_token
|
||||
)
|
||||
|
||||
class HopIDAuthentication(BaseAuthentication):
|
||||
def authenticate(self, request: HttpRequest) -> Optional[Tuple[object, str]]:
|
||||
refresh_token = None
|
||||
is_session_based = False
|
||||
|
||||
auth_header = request.headers.get('Authorization', '')
|
||||
if auth_header.lower().startswith('bearer '):
|
||||
access_token = auth_header.split(' ', 1)[1].strip()
|
||||
else:
|
||||
access_token = request.session.get('access_token')
|
||||
refresh_token = request.session.get('refresh_token')
|
||||
is_session_based = True
|
||||
|
||||
user = None
|
||||
if access_token:
|
||||
try:
|
||||
claims = verify_jwt_token(access_token)
|
||||
user = get_user_from_jwt_claims(claims, access_token)
|
||||
except Exception:
|
||||
user = None
|
||||
|
||||
# Try refreshing session-based tokens
|
||||
if not user and is_session_based and refresh_token:
|
||||
tokens = refresh_access_token(refresh_token)
|
||||
if tokens:
|
||||
access_token = tokens.get('access_token')
|
||||
refresh_token = tokens.get('refresh_token')
|
||||
|
||||
request.session['access_token'] = access_token
|
||||
request.session['refresh_token'] = refresh_token
|
||||
|
||||
try:
|
||||
claims = verify_jwt_token(access_token)
|
||||
user = get_user_from_jwt_claims(claims, access_token)
|
||||
except Exception:
|
||||
user = None
|
||||
|
||||
if not user:
|
||||
raise AuthenticationFailed('Invalid or expired token.')
|
||||
|
||||
return user, access_token
|
@ -53,7 +53,7 @@ def hopid_protected(view_func):
|
||||
user = None
|
||||
if access_token:
|
||||
claims = verify_jwt_token(access_token)
|
||||
user = get_user_from_jwt_claims(claims)
|
||||
user = get_user_from_jwt_claims(claims, access_token)
|
||||
|
||||
if not user and is_session_based and refresh_token:
|
||||
tokens = refresh_access_token(refresh_token)
|
||||
@ -65,7 +65,7 @@ def hopid_protected(view_func):
|
||||
request.session['refresh_token'] = refresh_token
|
||||
|
||||
claims = verify_jwt_token(access_token)
|
||||
user = get_user_from_jwt_claims(claims)
|
||||
user = get_user_from_jwt_claims(claims, access_token)
|
||||
|
||||
if not user:
|
||||
return JsonResponse({'detail': 'Invalid or expired token.'}, status=401)
|
||||
@ -104,7 +104,7 @@ def hopid_callback(response=None):
|
||||
return fail("Invalid or missing nonce")
|
||||
|
||||
access_claims = verify_jwt_token(access_token)
|
||||
profile = get_user_from_jwt_claims(access_claims)
|
||||
profile = get_user_from_jwt_claims(access_claims, access_token)
|
||||
if not profile:
|
||||
return fail("Invalid access token")
|
||||
|
||||
|
@ -54,16 +54,18 @@ def verify_jwt_token(token):
|
||||
token,
|
||||
key=public_key,
|
||||
algorithms=["RS256"],
|
||||
audience=settings.HOPID_CLIENT_ID,
|
||||
#audience=settings.HOPID_CLIENT_ID,
|
||||
issuer=settings.HOPID_URL,
|
||||
options={"verify_aud": False} # BAD PRACTICE / DELETE
|
||||
)
|
||||
return claims
|
||||
|
||||
except (InvalidTokenError, ExpiredSignatureError, requests.RequestException, KeyError):
|
||||
except (InvalidTokenError, ExpiredSignatureError, requests.RequestException, KeyError) as e:
|
||||
print(e)
|
||||
return None
|
||||
|
||||
|
||||
def get_user_from_jwt_claims(claims):
|
||||
def get_user_from_jwt_claims(claims, access_token):
|
||||
if not claims:
|
||||
return None
|
||||
|
||||
@ -73,10 +75,35 @@ def get_user_from_jwt_claims(claims):
|
||||
if not username or not email:
|
||||
return None
|
||||
|
||||
user, _ = get_user_model().objects.get_or_create(
|
||||
email=email,
|
||||
defaults={"username": username.lower()}
|
||||
User = get_user_model()
|
||||
user = User.objects.filter(email=email).first()
|
||||
|
||||
if user:
|
||||
return user
|
||||
|
||||
if not access_token:
|
||||
return None
|
||||
|
||||
try:
|
||||
response = requests.get(
|
||||
f"{settings.HOPID_URL}/users/userinfo/",
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
timeout=5
|
||||
)
|
||||
if response.status_code != 200:
|
||||
return None
|
||||
userinfo = response.json()
|
||||
except requests.RequestException:
|
||||
return None
|
||||
|
||||
model_fields = {f.name for f in User._meta.get_fields()}
|
||||
safe_defaults = {
|
||||
k: v for k, v in userinfo.items()
|
||||
if k in model_fields and k not in {'id', 'pk', 'password', 'email', 'username'}
|
||||
}
|
||||
safe_defaults.setdefault('username', username.lower())
|
||||
user, _ = User.objects.get_or_create(email=email, defaults=safe_defaults)
|
||||
|
||||
return user
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user