|
|
@@ -1,5 +1,7 @@
|
|
|
import logging
|
|
|
-from typing import List
|
|
|
+import json
|
|
|
+import base64
|
|
|
+from typing import List, Optional, Dict, Any
|
|
|
|
|
|
import ory_hydra_client
|
|
|
from ory_hydra_client.api import o_auth2_api
|
|
|
@@ -13,6 +15,32 @@ from app.core.hydra_config import hydra_settings
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
+def decode_jwt_payload(token: str) -> Optional[Dict[str, Any]]:
|
|
|
+ """
|
|
|
+ 解析 JWT token 的 payload(不验证签名,仅用于日志记录)。
|
|
|
+ 用于打印 id_token 的内容。
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # JWT 格式: header.payload.signature
|
|
|
+ parts = token.split('.')
|
|
|
+ if len(parts) != 3:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 解码 payload(第二部分)
|
|
|
+ payload_b64 = parts[1]
|
|
|
+ # 添加 padding 如果需要
|
|
|
+ padding = len(payload_b64) % 4
|
|
|
+ if padding:
|
|
|
+ payload_b64 += '=' * (4 - padding)
|
|
|
+
|
|
|
+ payload_bytes = base64.urlsafe_b64decode(payload_b64)
|
|
|
+ payload_str = payload_bytes.decode('utf-8')
|
|
|
+ return json.loads(payload_str)
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"Failed to decode JWT payload: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
class HydraService:
|
|
|
def __init__(self):
|
|
|
configuration = ory_hydra_client.Configuration(
|
|
|
@@ -127,11 +155,23 @@ class HydraService:
|
|
|
grant_access_token_audience=[],
|
|
|
remember=True,
|
|
|
remember_for=3600,
|
|
|
- session=id_token_claims
|
|
|
+ session={"id_token": id_token_claims}
|
|
|
)
|
|
|
try:
|
|
|
- logger.info(f"接受同意请求 (challenge: {challenge}, scope: {grant_scope})")
|
|
|
- return self.oauth2.accept_o_auth2_consent_request(challenge, accept_o_auth2_consent_request=body)
|
|
|
+ # 详细记录注入的 claims(格式化 JSON)
|
|
|
+ claims_json = json.dumps(id_token_claims, ensure_ascii=False, indent=2)
|
|
|
+ logger.info(
|
|
|
+ f"接受同意请求 (challenge: {challenge}, scope: {grant_scope})\n"
|
|
|
+ f"注入到 id_token 的 claims:\n{claims_json}"
|
|
|
+ )
|
|
|
+ result = self.oauth2.accept_o_auth2_consent_request(challenge, accept_o_auth2_consent_request=body)
|
|
|
+
|
|
|
+ # 如果返回结果中包含 redirect_to,尝试从 URL 参数中提取可能的 token(仅用于调试)
|
|
|
+ # 注意:实际的 id_token 是在 token 交换时才会生成
|
|
|
+ if hasattr(result, 'redirect_to') and result.redirect_to:
|
|
|
+ logger.debug(f"Consent accepted, redirect_to: {result.redirect_to}")
|
|
|
+
|
|
|
+ return result
|
|
|
except Exception as e:
|
|
|
logger.error(f"接受同意请求失败 (challenge: {challenge}): {e}")
|
|
|
raise
|
|
|
@@ -175,4 +215,22 @@ class HydraService:
|
|
|
logger.error(f"接受登出请求失败 (challenge: {challenge}): {e}")
|
|
|
raise
|
|
|
|
|
|
+ def log_id_token_content(self, id_token: str, context: str = ""):
|
|
|
+ """
|
|
|
+ 解析并打印 id_token 的内容(用于调试)。
|
|
|
+
|
|
|
+ Args:
|
|
|
+ id_token: JWT 格式的 id_token
|
|
|
+ context: 上下文信息(如 "token exchange")
|
|
|
+ """
|
|
|
+ payload = decode_jwt_payload(id_token)
|
|
|
+ if payload:
|
|
|
+ payload_json = json.dumps(payload, ensure_ascii=False, indent=2)
|
|
|
+ logger.info(
|
|
|
+ f"ID Token 内容 {context}:\n{payload_json}"
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ logger.warning(f"无法解析 id_token {context}")
|
|
|
+
|
|
|
+
|
|
|
hydra_service = HydraService()
|