| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 |
- import time
- import hmac
- import hashlib
- import httpx
- from typing import Dict, Any, Optional
- from backend.app.core.config import settings
- class SimpleAuthService:
- @staticmethod
- def generate_signature(params: Dict[str, Any], secret: str) -> str:
- """
- Generate signature based on Simple Auth Guide.
- 1. Filter out 'sign' and None values.
- 2. Sort by key.
- 3. Concat as query string.
- 4. HMAC-SHA256.
- """
- data = {k: v for k, v in params.items() if k != "sign" and v is not None}
- sorted_keys = sorted(data.keys())
- query_string = "&".join([f"{k}={data[k]}" for k in sorted_keys])
-
- signature = hmac.new(
- secret.encode('utf-8'),
- query_string.encode('utf-8'),
- hashlib.sha256
- ).hexdigest()
- return signature
- @staticmethod
- async def validate_ticket(ticket: str) -> Optional[Dict[str, Any]]:
- """
- Validate ticket with the auth server.
- """
- timestamp = int(time.time())
- params = {
- "app_id": settings.SIMPLE_AUTH_APP_ID,
- "ticket": ticket,
- "timestamp": timestamp,
- }
-
- # Calculate sign
- sign = SimpleAuthService.generate_signature(params, settings.SIMPLE_AUTH_APP_SECRET)
- params["sign"] = sign
-
- url = f"{settings.SIMPLE_AUTH_BASE_URL}/validate"
-
- async with httpx.AsyncClient() as client:
- try:
- response = await client.post(url, json=params)
- if response.status_code == 200:
- result = response.json()
- if result.get("valid") is True:
- return result
- return None
- except Exception as e:
- print(f"Error validating ticket: {e}")
- return None
|