hydra_service.py 3.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import logging
  2. import ory_hydra_client
  3. from ory_hydra_client.api import o_auth2_api
  4. from ory_hydra_client.models.accept_o_auth2_login_request import AcceptOAuth2LoginRequest
  5. from ory_hydra_client.models.reject_o_auth2_request import RejectOAuth2Request
  6. from ory_hydra_client.models.accept_o_auth2_consent_request import AcceptOAuth2ConsentRequest
  7. from ory_hydra_client.models.o_auth2_consent_session import OAuth2ConsentSession
  8. from app.core.hydra_config import hydra_settings
  9. logger = logging.getLogger(__name__)
  10. class HydraService:
  11. def __init__(self):
  12. configuration = ory_hydra_client.Configuration(
  13. host=hydra_settings.HYDRA_ADMIN_URL
  14. )
  15. self.api_client = ory_hydra_client.ApiClient(configuration)
  16. self.oauth2 = o_auth2_api.OAuth2Api(self.api_client)
  17. def get_login_request(self, challenge: str):
  18. try:
  19. return self.oauth2.get_o_auth2_login_request(challenge)
  20. except Exception as e:
  21. logger.error(f"获取登录请求失败 (challenge: {challenge}): {e}")
  22. raise
  23. def accept_login_request(self, challenge: str, subject: str):
  24. body = AcceptOAuth2LoginRequest(
  25. subject=subject,
  26. remember=True,
  27. remember_for=3600,
  28. )
  29. try:
  30. logger.info(f"接受登录请求 (subject: {subject}, challenge: {challenge})")
  31. return self.oauth2.accept_o_auth2_login_request(challenge, accept_o_auth2_login_request=body)
  32. except Exception as e:
  33. logger.error(f"接受登录请求失败 (challenge: {challenge}): {e}")
  34. raise
  35. def reject_login_request(self, challenge: str, error: str, error_description: str):
  36. body = RejectOAuth2Request(
  37. error=error,
  38. error_description=error_description
  39. )
  40. try:
  41. logger.info(f"拒绝登录请求 (challenge: {challenge}, error: {error})")
  42. return self.oauth2.reject_o_auth2_login_request(challenge, reject_o_auth2_request=body)
  43. except Exception as e:
  44. logger.error(f"拒绝登录请求失败 (challenge: {challenge}): {e}")
  45. raise
  46. def get_consent_request(self, challenge: str):
  47. try:
  48. return self.oauth2.get_o_auth2_consent_request(challenge)
  49. except Exception as e:
  50. logger.error(f"获取同意请求失败 (challenge: {challenge}): {e}")
  51. raise
  52. def accept_consent_request(self, challenge: str, grant_scope: list, id_token_claims: dict):
  53. body = AcceptOAuth2ConsentRequest(
  54. grant_scope=grant_scope,
  55. grant_access_token_audience=[],
  56. remember=True,
  57. remember_for=3600,
  58. session=OAuth2ConsentSession(
  59. id_token=id_token_claims
  60. )
  61. )
  62. try:
  63. logger.info(f"接受同意请求 (challenge: {challenge}, scope: {grant_scope})")
  64. return self.oauth2.accept_o_auth2_consent_request(challenge, accept_o_auth2_consent_request=body)
  65. except Exception as e:
  66. logger.error(f"接受同意请求失败 (challenge: {challenge}): {e}")
  67. raise
  68. def reject_consent_request(self, challenge: str, error: str, error_description: str):
  69. body = RejectOAuth2Request(
  70. error=error,
  71. error_description=error_description
  72. )
  73. try:
  74. logger.info(f"拒绝同意请求 (challenge: {challenge}, error: {error})")
  75. return self.oauth2.reject_o_auth2_consent_request(challenge, reject_o_auth2_request=body)
  76. except Exception as e:
  77. logger.error(f"拒绝同意请求失败 (challenge: {challenge}): {e}")
  78. raise
  79. hydra_service = HydraService()