Add refresh token
This commit is contained in:
parent
8409f36dd1
commit
197068ebfa
@ -61,6 +61,24 @@ def get_user_from_token(access_token):
|
||||
return profile
|
||||
|
||||
|
||||
def refresh_jwt_tokens(refresh_token):
|
||||
token_url = f"{settings.HOPID_URL}/o/token/"
|
||||
token_data = {
|
||||
'grant_type': 'refresh_token',
|
||||
'refresh_token': refresh_token,
|
||||
'client_id': settings.HOPID_CLIENT_ID,
|
||||
'client_secret': settings.HOPID_CLIENT_SECRET,
|
||||
}
|
||||
|
||||
try:
|
||||
token_response = requests.post(token_url, data=token_data, timeout=5)
|
||||
if token_response.status_code != 200:
|
||||
return {"error": "Refresh failed"}
|
||||
return token_response.json()
|
||||
except requests.RequestException:
|
||||
return {"error": "Token refresh request failed"}
|
||||
|
||||
|
||||
def get_jwt_tokens(code, code_verifier, next=None):
|
||||
token_url = f"{settings.HOPID_URL}/o/token/"
|
||||
redirect_uri = settings.HOPID_CLIENT_CALLBACK_URI or settings.HOPID_CLIENT_URI + f'/id/callback/'
|
||||
@ -75,6 +93,7 @@ def get_jwt_tokens(code, code_verifier, next=None):
|
||||
'client_id': settings.HOPID_CLIENT_ID,
|
||||
'client_secret': settings.HOPID_CLIENT_SECRET,
|
||||
'code_verifier': code_verifier,
|
||||
'scope': 'openid profile email offline_access'
|
||||
}
|
||||
|
||||
try:
|
||||
@ -113,7 +132,7 @@ def get_hopid_login_url(request, method=None, next=None):
|
||||
params = {
|
||||
"client_id": settings.HOPID_CLIENT_ID,
|
||||
"response_type": "code",
|
||||
"scope": "openid profile email",
|
||||
"scope": "openid profile email offline_access",
|
||||
"redirect_uri": redirect_uri,
|
||||
"nonce": nonce,
|
||||
"code_challenge": challenge,
|
||||
|
@ -28,29 +28,39 @@ from jose import jwt, JWTError
|
||||
from . import settings
|
||||
|
||||
|
||||
|
||||
def verify_id_token(id_token, access_token):
|
||||
jwks = requests.get(f"{settings.HOPID_URL}/.well-known/jwks.json").json()
|
||||
"""
|
||||
Verifies the given ID token using the JWKS from the HopID server.
|
||||
Returns claims if valid, raises JWTError otherwise.
|
||||
"""
|
||||
|
||||
jwks_url = f"{settings.HOPID_URL}/.well-known/jwks.json"
|
||||
|
||||
try:
|
||||
# Get unverified header to find 'kid'
|
||||
jwks = requests.get(jwks_url, timeout=5).json()
|
||||
|
||||
unverified_header = jwt.get_unverified_header(id_token)
|
||||
kid = unverified_header['kid']
|
||||
kid = unverified_header.get('kid')
|
||||
|
||||
# Find the matching key
|
||||
key = next(k for k in jwks['keys'] if k['kid'] == kid)
|
||||
issuer = settings.HOPID_URL.rstrip('/')
|
||||
if not kid:
|
||||
raise JWTError("Missing 'kid' in ID token header")
|
||||
|
||||
key = next((k for k in jwks.get('keys', []) if k.get('kid') == kid), None)
|
||||
if not key:
|
||||
raise JWTError(f"No matching public key found for kid: {kid}")
|
||||
|
||||
# Decode & verify
|
||||
claims = jwt.decode(
|
||||
id_token,
|
||||
key=key,
|
||||
algorithms=['RS256'],
|
||||
algorithms=["RS256"],
|
||||
audience=settings.HOPID_CLIENT_ID,
|
||||
issuer=issuer,
|
||||
issuer=settings.HOPID_URL.rstrip('/'),
|
||||
access_token=access_token
|
||||
)
|
||||
|
||||
return claims # Validated claims
|
||||
return claims
|
||||
|
||||
except JWTError as e:
|
||||
print("Invalid id_token:", e)
|
||||
except (requests.RequestException, KeyError, JWTError) as e:
|
||||
print(f"ID token verification failed: {e}")
|
||||
raise
|
||||
|
Loading…
x
Reference in New Issue
Block a user