238 lines
7.2 KiB
Python
238 lines
7.2 KiB
Python
"""
|
|
MIT License
|
|
|
|
Copyright (c) 2025 Hopsenn
|
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
of this software and associated documentation files (the "Software"), to deal
|
|
in the Software without restriction, including without limitation the rights
|
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
copies of the Software, and to permit persons to whom the Software is
|
|
furnished to do so, subject to the following conditions:
|
|
|
|
The above copyright notice and this permission notice shall be included in all
|
|
copies or substantial portions of the Software.
|
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
SOFTWARE.
|
|
"""
|
|
|
|
import hashlib
|
|
import base64
|
|
import secrets
|
|
import requests
|
|
|
|
import jwt
|
|
from jwt import InvalidTokenError, ExpiredSignatureError
|
|
from jwt.algorithms import RSAAlgorithm
|
|
|
|
from . import settings
|
|
from urllib.parse import urlencode
|
|
from django.contrib.auth import get_user_model
|
|
|
|
|
|
def verify_jwt_token(token):
|
|
try:
|
|
unverified_header = jwt.get_unverified_header(token)
|
|
kid = unverified_header.get("kid")
|
|
|
|
jwks_url = f"{settings.HOPID_URL}/.well-known/jwks.json"
|
|
jwks = requests.get(jwks_url, timeout=5).json()
|
|
|
|
key_data = next((k for k in jwks["keys"] if k["kid"] == kid), None)
|
|
if not key_data:
|
|
return None
|
|
|
|
public_key = RSAAlgorithm.from_jwk(key_data)
|
|
|
|
claims = jwt.decode(
|
|
token,
|
|
key=public_key,
|
|
algorithms=["RS256"],
|
|
#audience=settings.HOPID_CLIENT_ID,
|
|
issuer=settings.HOPID_URL,
|
|
options={"verify_aud": False} # BAD PRACTICE / DELETE
|
|
)
|
|
return claims
|
|
|
|
except (InvalidTokenError, ExpiredSignatureError, requests.RequestException, KeyError) as e:
|
|
print(e)
|
|
return None
|
|
|
|
|
|
def get_user_from_jwt_claims(claims, access_token):
|
|
if not claims:
|
|
return None
|
|
|
|
username = claims.get("username") or claims.get("sub")
|
|
email = claims.get("email")
|
|
|
|
if not username or not email:
|
|
return None
|
|
|
|
User = get_user_model()
|
|
user = User.objects.filter(email=email).first()
|
|
|
|
if user:
|
|
return user
|
|
|
|
if not access_token:
|
|
return None
|
|
|
|
try:
|
|
response = requests.get(
|
|
f"{settings.HOPID_URL}/users/userinfo/",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
timeout=5
|
|
)
|
|
if response.status_code != 200:
|
|
return None
|
|
userinfo = response.json()
|
|
except requests.RequestException:
|
|
return None
|
|
|
|
model_fields = {f.name for f in User._meta.get_fields()}
|
|
safe_defaults = {
|
|
k: v for k, v in userinfo.items()
|
|
if k in model_fields and k not in {'id', 'pk', 'password', 'email', 'username'}
|
|
}
|
|
safe_defaults.setdefault('username', username.lower())
|
|
user, _ = User.objects.get_or_create(email=email, defaults=safe_defaults)
|
|
|
|
return user
|
|
|
|
|
|
def refresh_access_token(refresh_token):
|
|
try:
|
|
response = requests.post(f"{settings.HOPID_URL}/o/token/", data={
|
|
'grant_type': 'refresh_token',
|
|
'refresh_token': refresh_token,
|
|
'client_id': settings.HOPID_CLIENT_ID,
|
|
'client_secret': settings.HOPID_CLIENT_SECRET,
|
|
}, timeout=5)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
except requests.RequestException:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def get_hopid_logout_url():
|
|
next_url = settings.HOPID_CLIENT_LOGOUT_URI or settings.HOPID_CLIENT_URI
|
|
query = urlencode({"next": next_url})
|
|
return f"{settings.HOPID_URL}/users/logout/?{query}"
|
|
|
|
|
|
def get_user_from_token(access_token):
|
|
userinfo_url = f"{settings.HOPID_URL}/users/userinfo/"
|
|
headers = {'Authorization': f'Bearer {access_token}'}
|
|
|
|
try:
|
|
userinfo_response = requests.get(userinfo_url, headers=headers, timeout=5)
|
|
except requests.RequestException:
|
|
return None
|
|
|
|
if userinfo_response.status_code != 200:
|
|
return None
|
|
|
|
userinfo = userinfo_response.json()
|
|
email = userinfo.get('email')
|
|
username = userinfo.get('username')
|
|
|
|
if not email or not username:
|
|
return None
|
|
|
|
profile, _ = get_user_model().objects.get_or_create(email=email, defaults={'username': username.lower()})
|
|
return profile
|
|
|
|
|
|
def refresh_jwt_tokens(refresh_token):
|
|
token_url = f"{settings.HOPID_URL}/o/token/"
|
|
token_data = {
|
|
'grant_type': 'refresh_token',
|
|
'refresh_token': refresh_token,
|
|
'client_id': settings.HOPID_CLIENT_ID,
|
|
'client_secret': settings.HOPID_CLIENT_SECRET,
|
|
}
|
|
|
|
try:
|
|
token_response = requests.post(token_url, data=token_data, timeout=5)
|
|
if token_response.status_code != 200:
|
|
return {"error": "Refresh failed"}
|
|
return token_response.json()
|
|
except requests.RequestException:
|
|
return {"error": "Token refresh request failed"}
|
|
|
|
|
|
def get_jwt_tokens(code, code_verifier, next=None):
|
|
token_url = f"{settings.HOPID_URL}/o/token/"
|
|
redirect_uri = settings.HOPID_CLIENT_CALLBACK_URI or settings.HOPID_CLIENT_URI + f'/id/callback/'
|
|
|
|
if next:
|
|
redirect_uri += f'?{urlencode({"next": next})}'
|
|
|
|
token_data = {
|
|
'grant_type': 'authorization_code',
|
|
'code': code,
|
|
'redirect_uri': redirect_uri,
|
|
'client_id': settings.HOPID_CLIENT_ID,
|
|
'client_secret': settings.HOPID_CLIENT_SECRET,
|
|
'code_verifier': code_verifier,
|
|
'scope': 'openid profile email offline_access'
|
|
}
|
|
|
|
try:
|
|
token_response = requests.post(token_url, data=token_data, timeout=5)
|
|
except requests.RequestException:
|
|
return {"error": "Token request failed"}
|
|
|
|
if token_response.status_code != 200:
|
|
return {"error": "Token exchange failed"}
|
|
|
|
tokens = token_response.json()
|
|
return tokens
|
|
|
|
|
|
def generate_pkce_pair():
|
|
verifier = secrets.token_urlsafe(64)
|
|
challenge = base64.urlsafe_b64encode(
|
|
hashlib.sha256(verifier.encode()).digest()
|
|
).rstrip(b'=').decode()
|
|
return verifier, challenge
|
|
|
|
|
|
def get_hopid_login_url(request, method=None, next=None):
|
|
nonce = secrets.token_urlsafe(32)
|
|
verifier, challenge = generate_pkce_pair()
|
|
|
|
request.session['pkce_verifier'] = verifier
|
|
request.session['oidc_nonce'] = nonce
|
|
|
|
base = f"{settings.HOPID_URL}/o/authorize/"
|
|
|
|
redirect_uri = settings.HOPID_CLIENT_CALLBACK_URI or settings.HOPID_CLIENT_URI + f'/id/callback/'
|
|
if next:
|
|
redirect_uri += f'?{urlencode({"next": next})}'
|
|
|
|
params = {
|
|
"client_id": settings.HOPID_CLIENT_ID,
|
|
"response_type": "code",
|
|
"scope": "openid profile email offline_access",
|
|
"redirect_uri": redirect_uri,
|
|
"nonce": nonce,
|
|
"code_challenge": challenge,
|
|
"code_challenge_method": "S256"
|
|
}
|
|
|
|
if method:
|
|
params.update({'method': method.lower()})
|
|
|
|
return f"{base}?{urlencode(params)}"
|