simple_auth.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. import time
  2. import hmac
  3. import hashlib
  4. import httpx
  5. from typing import Dict, Any, Optional
  6. from backend.app.core.config import settings
  7. class SimpleAuthService:
  8. @staticmethod
  9. def generate_signature(params: Dict[str, Any], secret: str) -> str:
  10. """
  11. Generate signature based on Simple Auth Guide.
  12. 1. Filter out 'sign' and None values.
  13. 2. Sort by key.
  14. 3. Concat as query string.
  15. 4. HMAC-SHA256.
  16. """
  17. data = {k: v for k, v in params.items() if k != "sign" and v is not None}
  18. sorted_keys = sorted(data.keys())
  19. query_string = "&".join([f"{k}={data[k]}" for k in sorted_keys])
  20. signature = hmac.new(
  21. secret.encode('utf-8'),
  22. query_string.encode('utf-8'),
  23. hashlib.sha256
  24. ).hexdigest()
  25. return signature
  26. @staticmethod
  27. async def validate_ticket(ticket: str) -> Optional[Dict[str, Any]]:
  28. """
  29. Validate ticket with the auth server.
  30. """
  31. timestamp = int(time.time())
  32. params = {
  33. "app_id": settings.SIMPLE_AUTH_APP_ID,
  34. "ticket": ticket,
  35. "timestamp": timestamp,
  36. }
  37. # Calculate sign
  38. sign = SimpleAuthService.generate_signature(params, settings.SIMPLE_AUTH_APP_SECRET)
  39. params["sign"] = sign
  40. url = f"{settings.SIMPLE_AUTH_BASE_URL}/validate"
  41. async with httpx.AsyncClient() as client:
  42. try:
  43. response = await client.post(url, json=params)
  44. if response.status_code == 200:
  45. result = response.json()
  46. if result.get("valid") is True:
  47. return result
  48. return None
  49. except Exception as e:
  50. print(f"Error validating ticket: {e}")
  51. return None