import logging import json import base64 from typing import List, Optional, Dict, Any import ory_hydra_client from ory_hydra_client.api import o_auth2_api from ory_hydra_client.models.accept_o_auth2_login_request import AcceptOAuth2LoginRequest from ory_hydra_client.models.reject_o_auth2_request import RejectOAuth2Request from ory_hydra_client.models.accept_o_auth2_consent_request import AcceptOAuth2ConsentRequest import requests from app.core.hydra_config import hydra_settings logger = logging.getLogger(__name__) def decode_jwt_payload(token: str) -> Optional[Dict[str, Any]]: """ 解析 JWT token 的 payload(不验证签名,仅用于日志记录)。 用于打印 id_token 的内容。 """ try: # JWT 格式: header.payload.signature parts = token.split('.') if len(parts) != 3: return None # 解码 payload(第二部分) payload_b64 = parts[1] # 添加 padding 如果需要 padding = len(payload_b64) % 4 if padding: payload_b64 += '=' * (4 - padding) payload_bytes = base64.urlsafe_b64decode(payload_b64) payload_str = payload_bytes.decode('utf-8') return json.loads(payload_str) except Exception as e: logger.warning(f"Failed to decode JWT payload: {e}") return None class HydraService: def __init__(self): configuration = ory_hydra_client.Configuration( host=hydra_settings.HYDRA_ADMIN_URL ) self.api_client = ory_hydra_client.ApiClient(configuration) self.oauth2 = o_auth2_api.OAuth2Api(self.api_client) # Admin REST base,用于调用 /clients 等管理接口 self.admin_base = hydra_settings.HYDRA_ADMIN_URL.rstrip("/") # ---------- OAuth2 Client 管理 ---------- def create_or_update_client( self, client_id: str, client_secret: str, redirect_uris: List[str], client_name: str = "", ) -> None: """ 在 Hydra 中创建或更新一个 OAuth2 Client,使其 client_id 与应用 app_id 对齐。 - 如果已存在同名 Client,则先尝试删除再重建(简单幂等实现)。 - grant_types / scope 等使用平台推荐的默认配置。 """ if not redirect_uris: logger.warning( "尝试在 Hydra 创建 Client 时未提供 redirect_uris,client_id=%s", client_id, ) payload = { "client_id": client_id, "client_secret": client_secret, "client_name": client_name or client_id, "redirect_uris": redirect_uris, "grant_types": ["authorization_code", "refresh_token"], "response_types": ["code"], "scope": "openid offline offline_access profile email", "token_endpoint_auth_method": "client_secret_basic", } # 1. 尝试删除已有的同名 Client(容错即可) try: resp_del = requests.delete( f"{self.admin_base}/admin/clients/{client_id}", timeout=5 ) if resp_del.status_code not in (200, 204, 404): logger.warning( "删除 Hydra Client 可能失败(忽略继续创建): status=%s, body=%s", resp_del.status_code, resp_del.text, ) except Exception as e: logger.warning("删除 Hydra Client 异常(忽略): %s", e) # 2. 创建新的 Client resp = requests.post( f"{self.admin_base}/admin/clients", json=payload, timeout=5 ) if resp.status_code not in (200, 201): logger.error( "在 Hydra 创建 Client 失败: status=%s, body=%s, url=%s, payload=%s", resp.status_code, resp.text, f"{self.admin_base}/admin/clients", payload, ) raise Exception(f"创建 Hydra OIDC Client 失败: {resp.text}") def get_login_request(self, challenge: str): try: return self.oauth2.get_o_auth2_login_request(challenge) except Exception as e: logger.error(f"获取登录请求失败 (challenge: {challenge}): {e}") raise def accept_login_request(self, challenge: str, subject: str): body = AcceptOAuth2LoginRequest( subject=subject, remember=True, remember_for=3600, ) try: logger.info(f"接受登录请求 (subject: {subject}, challenge: {challenge})") return self.oauth2.accept_o_auth2_login_request(challenge, accept_o_auth2_login_request=body) except Exception as e: logger.error(f"接受登录请求失败 (challenge: {challenge}): {e}") raise def reject_login_request(self, challenge: str, error: str, error_description: str): body = RejectOAuth2Request( error=error, error_description=error_description ) try: logger.info(f"拒绝登录请求 (challenge: {challenge}, error: {error})") return self.oauth2.reject_o_auth2_login_request(challenge, reject_o_auth2_request=body) except Exception as e: logger.error(f"拒绝登录请求失败 (challenge: {challenge}): {e}") raise def get_consent_request(self, challenge: str): try: return self.oauth2.get_o_auth2_consent_request(challenge) except Exception as e: logger.error(f"获取同意请求失败 (challenge: {challenge}): {e}") raise def accept_consent_request(self, challenge: str, grant_scope: list, id_token_claims: dict): body = AcceptOAuth2ConsentRequest( grant_scope=grant_scope, grant_access_token_audience=[], remember=True, remember_for=3600, session={"id_token": id_token_claims} ) try: # 详细记录注入的 claims(格式化 JSON) claims_json = json.dumps(id_token_claims, ensure_ascii=False, indent=2) logger.info( f"接受同意请求 (challenge: {challenge}, scope: {grant_scope})\n" f"注入到 id_token 的 claims:\n{claims_json}" ) result = self.oauth2.accept_o_auth2_consent_request(challenge, accept_o_auth2_consent_request=body) # 如果返回结果中包含 redirect_to,尝试从 URL 参数中提取可能的 token(仅用于调试) # 注意:实际的 id_token 是在 token 交换时才会生成 if hasattr(result, 'redirect_to') and result.redirect_to: logger.debug(f"Consent accepted, redirect_to: {result.redirect_to}") return result except Exception as e: logger.error(f"接受同意请求失败 (challenge: {challenge}): {e}") raise def reject_consent_request(self, challenge: str, error: str, error_description: str): body = RejectOAuth2Request( error=error, error_description=error_description ) try: logger.info(f"拒绝同意请求 (challenge: {challenge}, error: {error})") return self.oauth2.reject_o_auth2_consent_request(challenge, reject_o_auth2_request=body) except Exception as e: logger.error(f"拒绝同意请求失败 (challenge: {challenge}): {e}") raise def get_logout_request(self, challenge: str): try: resp = requests.get( f"{self.admin_base}/admin/oauth2/auth/requests/logout", params={"logout_challenge": challenge}, timeout=5 ) resp.raise_for_status() return resp.json() except Exception as e: logger.error(f"获取登出请求失败 (challenge: {challenge}): {e}") raise def accept_logout_request(self, challenge: str): try: resp = requests.put( f"{self.admin_base}/admin/oauth2/auth/requests/logout/accept", params={"logout_challenge": challenge}, timeout=5 ) resp.raise_for_status() logger.info(f"接受登出请求 (challenge: {challenge})") return resp.json() except Exception as e: logger.error(f"接受登出请求失败 (challenge: {challenge}): {e}") raise def log_id_token_content(self, id_token: str, context: str = ""): """ 解析并打印 id_token 的内容(用于调试)。 Args: id_token: JWT 格式的 id_token context: 上下文信息(如 "token exchange") """ payload = decode_jwt_payload(id_token) if payload: payload_json = json.dumps(payload, ensure_ascii=False, indent=2) logger.info( f"ID Token 内容 {context}:\n{payload_json}" ) else: logger.warning(f"无法解析 id_token {context}") hydra_service = HydraService()