146 lines
4.6 KiB
Python
146 lines
4.6 KiB
Python
"""
|
|
MIT License
|
|
|
|
Copyright (c) 2025 Hopsenn
|
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
of this software and associated documentation files (the "Software"), to deal
|
|
in the Software without restriction, including without limitation the rights
|
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
copies of the Software, and to permit persons to whom the Software is
|
|
furnished to do so, subject to the following conditions:
|
|
|
|
The above copyright notice and this permission notice shall be included in all
|
|
copies or substantial portions of the Software.
|
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
SOFTWARE.
|
|
"""
|
|
|
|
import hashlib
|
|
import base64
|
|
import secrets
|
|
import requests
|
|
|
|
from . import settings
|
|
from urllib.parse import urlencode
|
|
from django.contrib.auth import get_user_model
|
|
|
|
|
|
def get_hopid_logout_url():
|
|
next_url = settings.HOPID_CLIENT_LOGOUT_URI or settings.HOPID_CLIENT_URI
|
|
query = urlencode({"next": next_url})
|
|
return f"{settings.HOPID_URL}/users/logout/?{query}"
|
|
|
|
|
|
def get_user_from_token(access_token):
|
|
userinfo_url = f"{settings.HOPID_URL}/users/userinfo/"
|
|
headers = {'Authorization': f'Bearer {access_token}'}
|
|
|
|
try:
|
|
userinfo_response = requests.get(userinfo_url, headers=headers, timeout=5)
|
|
except requests.RequestException:
|
|
return None
|
|
|
|
if userinfo_response.status_code != 200:
|
|
return None
|
|
|
|
userinfo = userinfo_response.json()
|
|
email = userinfo.get('email')
|
|
username = userinfo.get('username')
|
|
|
|
if not email or not username:
|
|
return None
|
|
|
|
profile, _ = get_user_model().objects.get_or_create(email=email, defaults={'username': username.lower()})
|
|
return profile
|
|
|
|
|
|
def refresh_jwt_tokens(refresh_token):
|
|
token_url = f"{settings.HOPID_URL}/o/token/"
|
|
token_data = {
|
|
'grant_type': 'refresh_token',
|
|
'refresh_token': refresh_token,
|
|
'client_id': settings.HOPID_CLIENT_ID,
|
|
'client_secret': settings.HOPID_CLIENT_SECRET,
|
|
}
|
|
|
|
try:
|
|
token_response = requests.post(token_url, data=token_data, timeout=5)
|
|
if token_response.status_code != 200:
|
|
return {"error": "Refresh failed"}
|
|
return token_response.json()
|
|
except requests.RequestException:
|
|
return {"error": "Token refresh request failed"}
|
|
|
|
|
|
def get_jwt_tokens(code, code_verifier, next=None):
|
|
token_url = f"{settings.HOPID_URL}/o/token/"
|
|
redirect_uri = settings.HOPID_CLIENT_CALLBACK_URI or settings.HOPID_CLIENT_URI + f'/id/callback/'
|
|
|
|
if next:
|
|
redirect_uri += f'?{urlencode({"next": next})}'
|
|
|
|
token_data = {
|
|
'grant_type': 'authorization_code',
|
|
'code': code,
|
|
'redirect_uri': redirect_uri,
|
|
'client_id': settings.HOPID_CLIENT_ID,
|
|
'client_secret': settings.HOPID_CLIENT_SECRET,
|
|
'code_verifier': code_verifier,
|
|
'scope': 'openid profile email offline_access'
|
|
}
|
|
|
|
try:
|
|
token_response = requests.post(token_url, data=token_data, timeout=5)
|
|
except requests.RequestException:
|
|
return {"error": "Token request failed"}
|
|
|
|
if token_response.status_code != 200:
|
|
return {"error": "Token exchange failed"}
|
|
|
|
tokens = token_response.json()
|
|
return tokens
|
|
|
|
|
|
def generate_pkce_pair():
|
|
verifier = secrets.token_urlsafe(64)
|
|
challenge = base64.urlsafe_b64encode(
|
|
hashlib.sha256(verifier.encode()).digest()
|
|
).rstrip(b'=').decode()
|
|
return verifier, challenge
|
|
|
|
|
|
def get_hopid_login_url(request, method=None, next=None):
|
|
nonce = secrets.token_urlsafe(32)
|
|
verifier, challenge = generate_pkce_pair()
|
|
|
|
request.session['pkce_verifier'] = verifier
|
|
request.session['oidc_nonce'] = nonce
|
|
|
|
base = f"{settings.HOPID_URL}/o/authorize/"
|
|
|
|
redirect_uri = settings.HOPID_CLIENT_CALLBACK_URI or settings.HOPID_CLIENT_URI + f'/id/callback/'
|
|
if next:
|
|
redirect_uri += f'?{urlencode({"next": next})}'
|
|
|
|
params = {
|
|
"client_id": settings.HOPID_CLIENT_ID,
|
|
"response_type": "code",
|
|
"scope": "openid profile email offline_access",
|
|
"redirect_uri": redirect_uri,
|
|
"nonce": nonce,
|
|
"code_challenge": challenge,
|
|
"code_challenge_method": "S256"
|
|
}
|
|
|
|
if method:
|
|
params.update({'method': method.lower()})
|
|
|
|
return f"{base}?{urlencode(params)}"
|