| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- import logging
- import ory_hydra_client
- from ory_hydra_client.api import o_auth2_api
- from ory_hydra_client.models.accept_o_auth2_login_request import AcceptOAuth2LoginRequest
- from ory_hydra_client.models.reject_o_auth2_request import RejectOAuth2Request
- from ory_hydra_client.models.accept_o_auth2_consent_request import AcceptOAuth2ConsentRequest
- from ory_hydra_client.models.o_auth2_consent_session import OAuth2ConsentSession
- from app.core.hydra_config import hydra_settings
- logger = logging.getLogger(__name__)
- class HydraService:
- def __init__(self):
- configuration = ory_hydra_client.Configuration(
- host=hydra_settings.HYDRA_ADMIN_URL
- )
- self.api_client = ory_hydra_client.ApiClient(configuration)
- self.oauth2 = o_auth2_api.OAuth2Api(self.api_client)
- def get_login_request(self, challenge: str):
- try:
- return self.oauth2.get_o_auth2_login_request(challenge)
- except Exception as e:
- logger.error(f"获取登录请求失败 (challenge: {challenge}): {e}")
- raise
- def accept_login_request(self, challenge: str, subject: str):
- body = AcceptOAuth2LoginRequest(
- subject=subject,
- remember=True,
- remember_for=3600,
- )
- try:
- logger.info(f"接受登录请求 (subject: {subject}, challenge: {challenge})")
- return self.oauth2.accept_o_auth2_login_request(challenge, accept_o_auth2_login_request=body)
- except Exception as e:
- logger.error(f"接受登录请求失败 (challenge: {challenge}): {e}")
- raise
- def reject_login_request(self, challenge: str, error: str, error_description: str):
- body = RejectOAuth2Request(
- error=error,
- error_description=error_description
- )
- try:
- logger.info(f"拒绝登录请求 (challenge: {challenge}, error: {error})")
- return self.oauth2.reject_o_auth2_login_request(challenge, reject_o_auth2_request=body)
- except Exception as e:
- logger.error(f"拒绝登录请求失败 (challenge: {challenge}): {e}")
- raise
- def get_consent_request(self, challenge: str):
- try:
- return self.oauth2.get_o_auth2_consent_request(challenge)
- except Exception as e:
- logger.error(f"获取同意请求失败 (challenge: {challenge}): {e}")
- raise
- def accept_consent_request(self, challenge: str, grant_scope: list, id_token_claims: dict):
- body = AcceptOAuth2ConsentRequest(
- grant_scope=grant_scope,
- grant_access_token_audience=[],
- remember=True,
- remember_for=3600,
- session=OAuth2ConsentSession(
- id_token=id_token_claims
- )
- )
- try:
- logger.info(f"接受同意请求 (challenge: {challenge}, scope: {grant_scope})")
- return self.oauth2.accept_o_auth2_consent_request(challenge, accept_o_auth2_consent_request=body)
- except Exception as e:
- logger.error(f"接受同意请求失败 (challenge: {challenge}): {e}")
- raise
-
- def reject_consent_request(self, challenge: str, error: str, error_description: str):
- body = RejectOAuth2Request(
- error=error,
- error_description=error_description
- )
- try:
- logger.info(f"拒绝同意请求 (challenge: {challenge}, error: {error})")
- return self.oauth2.reject_o_auth2_consent_request(challenge, reject_o_auth2_request=body)
- except Exception as e:
- logger.error(f"拒绝同意请求失败 (challenge: {challenge}): {e}")
- raise
- hydra_service = HydraService()
|