| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- import logging
- import json
- import base64
- from typing import List, Optional, Dict, Any
- import ory_hydra_client
- from ory_hydra_client.api import o_auth2_api
- from ory_hydra_client.models.accept_o_auth2_login_request import AcceptOAuth2LoginRequest
- from ory_hydra_client.models.reject_o_auth2_request import RejectOAuth2Request
- from ory_hydra_client.models.accept_o_auth2_consent_request import AcceptOAuth2ConsentRequest
- import requests
- from app.core.hydra_config import hydra_settings
- logger = logging.getLogger(__name__)
- def decode_jwt_payload(token: str) -> Optional[Dict[str, Any]]:
- """
- 解析 JWT token 的 payload(不验证签名,仅用于日志记录)。
- 用于打印 id_token 的内容。
- """
- try:
- # JWT 格式: header.payload.signature
- parts = token.split('.')
- if len(parts) != 3:
- return None
-
- # 解码 payload(第二部分)
- payload_b64 = parts[1]
- # 添加 padding 如果需要
- padding = len(payload_b64) % 4
- if padding:
- payload_b64 += '=' * (4 - padding)
-
- payload_bytes = base64.urlsafe_b64decode(payload_b64)
- payload_str = payload_bytes.decode('utf-8')
- return json.loads(payload_str)
- except Exception as e:
- logger.warning(f"Failed to decode JWT payload: {e}")
- return None
- class HydraService:
- def __init__(self):
- configuration = ory_hydra_client.Configuration(
- host=hydra_settings.HYDRA_ADMIN_URL
- )
- self.api_client = ory_hydra_client.ApiClient(configuration)
- self.oauth2 = o_auth2_api.OAuth2Api(self.api_client)
- # Admin REST base,用于调用 /clients 等管理接口
- self.admin_base = hydra_settings.HYDRA_ADMIN_URL.rstrip("/")
- # ---------- OAuth2 Client 管理 ----------
- def create_or_update_client(
- self,
- client_id: str,
- client_secret: str,
- redirect_uris: List[str],
- client_name: str = "",
- ) -> None:
- """
- 在 Hydra 中创建或更新一个 OAuth2 Client,使其 client_id 与应用 app_id 对齐。
- - 如果已存在同名 Client,则先尝试删除再重建(简单幂等实现)。
- - grant_types / scope 等使用平台推荐的默认配置。
- """
- if not redirect_uris:
- logger.warning(
- "尝试在 Hydra 创建 Client 时未提供 redirect_uris,client_id=%s",
- client_id,
- )
- payload = {
- "client_id": client_id,
- "client_secret": client_secret,
- "client_name": client_name or client_id,
- "redirect_uris": redirect_uris,
- "grant_types": ["authorization_code", "refresh_token"],
- "response_types": ["code"],
- "scope": "openid offline offline_access profile email",
- "token_endpoint_auth_method": "client_secret_basic",
- }
- # 1. 尝试删除已有的同名 Client(容错即可)
- try:
- resp_del = requests.delete(
- f"{self.admin_base}/admin/clients/{client_id}", timeout=5
- )
- if resp_del.status_code not in (200, 204, 404):
- logger.warning(
- "删除 Hydra Client 可能失败(忽略继续创建): status=%s, body=%s",
- resp_del.status_code,
- resp_del.text,
- )
- except Exception as e:
- logger.warning("删除 Hydra Client 异常(忽略): %s", e)
- # 2. 创建新的 Client
- resp = requests.post(
- f"{self.admin_base}/admin/clients", json=payload, timeout=5
- )
- if resp.status_code not in (200, 201):
- logger.error(
- "在 Hydra 创建 Client 失败: status=%s, body=%s, url=%s, payload=%s",
- resp.status_code,
- resp.text,
- f"{self.admin_base}/admin/clients",
- payload,
- )
- raise Exception(f"创建 Hydra OIDC Client 失败: {resp.text}")
- def get_login_request(self, challenge: str):
- try:
- return self.oauth2.get_o_auth2_login_request(challenge)
- except Exception as e:
- logger.error(f"获取登录请求失败 (challenge: {challenge}): {e}")
- raise
- def accept_login_request(self, challenge: str, subject: str):
- body = AcceptOAuth2LoginRequest(
- subject=subject,
- remember=True,
- remember_for=3600,
- )
- try:
- logger.info(f"接受登录请求 (subject: {subject}, challenge: {challenge})")
- return self.oauth2.accept_o_auth2_login_request(challenge, accept_o_auth2_login_request=body)
- except Exception as e:
- logger.error(f"接受登录请求失败 (challenge: {challenge}): {e}")
- raise
- def reject_login_request(self, challenge: str, error: str, error_description: str):
- body = RejectOAuth2Request(
- error=error,
- error_description=error_description
- )
- try:
- logger.info(f"拒绝登录请求 (challenge: {challenge}, error: {error})")
- return self.oauth2.reject_o_auth2_login_request(challenge, reject_o_auth2_request=body)
- except Exception as e:
- logger.error(f"拒绝登录请求失败 (challenge: {challenge}): {e}")
- raise
- def get_consent_request(self, challenge: str):
- try:
- return self.oauth2.get_o_auth2_consent_request(challenge)
- except Exception as e:
- logger.error(f"获取同意请求失败 (challenge: {challenge}): {e}")
- raise
- def accept_consent_request(self, challenge: str, grant_scope: list, id_token_claims: dict):
- body = AcceptOAuth2ConsentRequest(
- grant_scope=grant_scope,
- grant_access_token_audience=[],
- remember=True,
- remember_for=3600,
- session={"id_token": id_token_claims}
- )
- try:
- # 详细记录注入的 claims(格式化 JSON)
- claims_json = json.dumps(id_token_claims, ensure_ascii=False, indent=2)
- logger.info(
- f"接受同意请求 (challenge: {challenge}, scope: {grant_scope})\n"
- f"注入到 id_token 的 claims:\n{claims_json}"
- )
- result = self.oauth2.accept_o_auth2_consent_request(challenge, accept_o_auth2_consent_request=body)
-
- # 如果返回结果中包含 redirect_to,尝试从 URL 参数中提取可能的 token(仅用于调试)
- # 注意:实际的 id_token 是在 token 交换时才会生成
- if hasattr(result, 'redirect_to') and result.redirect_to:
- logger.debug(f"Consent accepted, redirect_to: {result.redirect_to}")
-
- return result
- except Exception as e:
- logger.error(f"接受同意请求失败 (challenge: {challenge}): {e}")
- raise
-
- def reject_consent_request(self, challenge: str, error: str, error_description: str):
- body = RejectOAuth2Request(
- error=error,
- error_description=error_description
- )
- try:
- logger.info(f"拒绝同意请求 (challenge: {challenge}, error: {error})")
- return self.oauth2.reject_o_auth2_consent_request(challenge, reject_o_auth2_request=body)
- except Exception as e:
- logger.error(f"拒绝同意请求失败 (challenge: {challenge}): {e}")
- raise
- def get_logout_request(self, challenge: str):
- try:
- resp = requests.get(
- f"{self.admin_base}/admin/oauth2/auth/requests/logout",
- params={"logout_challenge": challenge},
- timeout=5
- )
- resp.raise_for_status()
- return resp.json()
- except Exception as e:
- logger.error(f"获取登出请求失败 (challenge: {challenge}): {e}")
- raise
- def accept_logout_request(self, challenge: str):
- try:
- resp = requests.put(
- f"{self.admin_base}/admin/oauth2/auth/requests/logout/accept",
- params={"logout_challenge": challenge},
- timeout=5
- )
- resp.raise_for_status()
- logger.info(f"接受登出请求 (challenge: {challenge})")
- return resp.json()
- except Exception as e:
- logger.error(f"接受登出请求失败 (challenge: {challenge}): {e}")
- raise
- def log_id_token_content(self, id_token: str, context: str = ""):
- """
- 解析并打印 id_token 的内容(用于调试)。
-
- Args:
- id_token: JWT 格式的 id_token
- context: 上下文信息(如 "token exchange")
- """
- payload = decode_jwt_payload(id_token)
- if payload:
- payload_json = json.dumps(payload, ensure_ascii=False, indent=2)
- logger.info(
- f"ID Token 内容 {context}:\n{payload_json}"
- )
- else:
- logger.warning(f"无法解析 id_token {context}")
- hydra_service = HydraService()
|