webhook_service.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import httpx
  2. import time
  3. import json
  4. import logging
  5. import hmac
  6. import hashlib
  7. from typing import Dict, Any
  8. from sqlalchemy.orm import Session
  9. from app.models.user import User
  10. from app.models.mapping import AppUserMapping
  11. from app.models.application import Application
  12. logger = logging.getLogger(__name__)
  13. class WebhookService:
  14. @staticmethod
  15. def _generate_signature(secret: str, payload: str, timestamp: int) -> str:
  16. """
  17. Generate X-UAP-Signature: HMAC-SHA256(secret, timestamp + "." + payload)
  18. """
  19. message = f"{timestamp}.{payload}"
  20. return hmac.new(
  21. secret.encode('utf-8'),
  22. message.encode('utf-8'),
  23. hashlib.sha256
  24. ).hexdigest()
  25. @staticmethod
  26. async def send_notification(url: str, secret: str, event_type: str, user_data: Dict[str, Any]):
  27. timestamp = int(time.time())
  28. payload_dict = {
  29. "event_type": event_type, # ADD, UPDATE, DISABLE
  30. "timestamp": timestamp,
  31. "data": user_data
  32. }
  33. payload_json = json.dumps(payload_dict)
  34. signature = WebhookService._generate_signature(secret, payload_json, timestamp)
  35. headers = {
  36. "Content-Type": "application/json",
  37. "X-UAP-Signature": signature,
  38. "X-UAP-Timestamp": str(timestamp)
  39. }
  40. async with httpx.AsyncClient() as client:
  41. try:
  42. response = await client.post(url, content=payload_json, headers=headers, timeout=5.0)
  43. response.raise_for_status()
  44. logger.info(f"Webhook sent to {url}. Status: {response.status_code}")
  45. except Exception as e:
  46. logger.error(f"Failed to send webhook to {url}: {str(e)}")
  47. @staticmethod
  48. async def trigger_user_event(db: Session, user_id: int, event_type: str):
  49. """
  50. Finds all apps mapped to this user and sends webhooks if configured.
  51. """
  52. user = db.query(User).filter(User.id == user_id).first()
  53. if not user:
  54. return
  55. mappings = db.query(AppUserMapping).filter(AppUserMapping.user_id == user_id).all()
  56. for mapping in mappings:
  57. app = mapping.application
  58. if app.notification_url:
  59. # Construct data specific to the app (e.g. including mapped_key)
  60. user_data = {
  61. "user_id": user.id,
  62. "mobile": user.mobile,
  63. "status": user.status,
  64. "mapped_key": mapping.mapped_key
  65. }
  66. # Note: app.app_secret is used for signing.
  67. # If app_secret is stored as hash, we technically CANNOT sign it securely
  68. # unless we stored the plain secret or a separate webhook secret.
  69. # Per previous step, we changed Application to store plain `app_secret`.
  70. await WebhookService.send_notification(
  71. app.notification_url,
  72. app.app_secret,
  73. event_type,
  74. user_data
  75. )