Added JWTs and API protection

This commit is contained in:
Ivan Nikolskiy 2025-07-19 17:52:22 +02:00
parent 8633bb09d2
commit 793cb3d83b
4 changed files with 134 additions and 17 deletions

View File

@ -129,6 +129,19 @@ This renders a styled login button based on your template:
Or use `{% hopid_login_url %}` manually in `href`. Or use `{% hopid_login_url %}` manually in `href`.
### 5. Protect API
```django
from django.http import JsonResponse
from django_hopid.decorators import hopid_protected
@hopid_protected
def protected_view(request):
return JsonResponse({
"message": f"Hello {request.user.username}, you're authenticated!",
})
```
--- ---
## ⚙️ Advanced Usage ## ⚙️ Advanced Usage

View File

@ -24,10 +24,56 @@ SOFTWARE.
from functools import wraps from functools import wraps
from django.contrib.auth import login as auth_login from django.contrib.auth import login as auth_login
from django.http import JsonResponse
from . import settings from . import settings
from .validators import verify_id_token from .validators import verify_id_token
from .utils import get_jwt_tokens, get_user_from_token from .utils import (
get_jwt_tokens,
verify_jwt_token,
get_user_from_jwt_claims,
refresh_access_token
)
def hopid_protected(view_func):
@wraps(view_func)
def _wrapped_view(request, *args, **kwargs):
refresh_token = None
is_session_based = False
auth_header = request.headers.get('Authorization', '')
if auth_header.lower().startswith('bearer '):
access_token = auth_header.split(' ', 1)[1].strip()
else:
access_token = request.session.get('access_token')
refresh_token = request.session.get('refresh_token')
is_session_based = True
user = None
if access_token:
claims = verify_jwt_token(access_token)
user = get_user_from_jwt_claims(claims)
if not user and is_session_based and refresh_token:
tokens = refresh_access_token(refresh_token)
if tokens:
access_token = tokens.get('access_token')
refresh_token = tokens.get('refresh_token')
request.session['access_token'] = access_token
request.session['refresh_token'] = refresh_token
claims = verify_jwt_token(access_token)
user = get_user_from_jwt_claims(claims)
if not user:
return JsonResponse({'detail': 'Invalid or expired token.'}, status=401)
request.user = user
return view_func(request, *args, **kwargs)
return _wrapped_view
def hopid_callback(response=None): def hopid_callback(response=None):
@ -37,42 +83,36 @@ def hopid_callback(response=None):
def fail(reason): def fail(reason):
if callable(response): if callable(response):
return response(request, reason) return response(request, reason)
return view_func(request, *args, **kwargs, error=reason) return view_func(request, *args, **kwargs, error=reason)
code = request.GET.get('code') code = request.GET.get('code')
next = request.GET.get('next') next = request.GET.get('next')
if not code: if not code:
return fail("No code returned") return fail("No code returned")
tokens = get_jwt_tokens(code, request.session.pop('pkce_verifier', ''), next) tokens = get_jwt_tokens(code, request.session.pop('pkce_verifier', ''), next)
error = tokens.get('error', '') if tokens.get('error'):
return fail(tokens['error'])
if error:
return fail(error)
access_token = tokens.get('access_token') access_token = tokens.get('access_token')
id_token = tokens.get('id_token') id_token = tokens.get('id_token')
if not access_token or not id_token: if not access_token or not id_token:
return fail("No ID token returned") return fail("No ID token returned")
claims = verify_id_token(id_token, access_token) claims = verify_id_token(id_token, access_token)
if not claims: if not claims or claims.get("nonce") != request.session.pop('oidc_nonce', None):
return fail("Invalid ID token")
expected_nonce = request.session.pop('oidc_nonce', None)
if not claims or claims.get("nonce") != expected_nonce:
return fail("Invalid or missing nonce") return fail("Invalid or missing nonce")
profile = get_user_from_token(access_token) access_claims = verify_jwt_token(access_token)
profile = get_user_from_jwt_claims(access_claims)
if not profile: if not profile:
return fail("User info request failed") return fail("Invalid access token")
request.session['access_token'] = access_token
request.session['refresh_token'] = tokens.get('refresh_token')
auth_login(request, profile) auth_login(request, profile)
return view_func(request, *args, **kwargs) return view_func(request, *args, **kwargs)
return _wrapped_view return _wrapped_view
return decorator return decorator

View File

@ -27,11 +27,76 @@ import base64
import secrets import secrets
import requests import requests
import jwt
from jwt import InvalidTokenError, ExpiredSignatureError
from jwt.algorithms import RSAAlgorithm
from . import settings from . import settings
from urllib.parse import urlencode from urllib.parse import urlencode
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model
def verify_jwt_token(token):
try:
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header.get("kid")
jwks_url = f"{settings.HOPID_URL}/.well-known/jwks.json"
jwks = requests.get(jwks_url, timeout=5).json()
key_data = next((k for k in jwks["keys"] if k["kid"] == kid), None)
if not key_data:
return None
public_key = RSAAlgorithm.from_jwk(key_data)
claims = jwt.decode(
token,
key=public_key,
algorithms=["RS256"],
audience=settings.HOPID_CLIENT_ID,
issuer=settings.HOPID_URL,
)
return claims
except (InvalidTokenError, ExpiredSignatureError, requests.RequestException, KeyError):
return None
def get_user_from_jwt_claims(claims):
if not claims:
return None
username = claims.get("username") or claims.get("sub")
email = claims.get("email")
if not username or not email:
return None
user, _ = get_user_model().objects.get_or_create(
email=email,
defaults={"username": username.lower()}
)
return user
def refresh_access_token(refresh_token):
try:
response = requests.post(f"{settings.HOPID_URL}/o/token/", data={
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'client_id': settings.HOPID_CLIENT_ID,
'client_secret': settings.HOPID_CLIENT_SECRET,
}, timeout=5)
if response.status_code == 200:
return response.json()
except requests.RequestException:
pass
return None
def get_hopid_logout_url(): def get_hopid_logout_url():
next_url = settings.HOPID_CLIENT_LOGOUT_URI or settings.HOPID_CLIENT_URI next_url = settings.HOPID_CLIENT_LOGOUT_URI or settings.HOPID_CLIENT_URI
query = urlencode({"next": next_url}) query = urlencode({"next": next_url})

View File

@ -4,7 +4,6 @@ from django.contrib.auth import logout as django_logout
from .utils import get_hopid_logout_url from .utils import get_hopid_logout_url
from .decorators import hopid_callback from .decorators import hopid_callback
@hopid_callback() @hopid_callback()
def hopid_callback_view(request, *args, **kwargs): def hopid_callback_view(request, *args, **kwargs):
next = request.GET.get('next') or '/' next = request.GET.get('next') or '/'