| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- import httpx
- import time
- import json
- import logging
- import hmac
- import hashlib
- from typing import Dict, Any
- from sqlalchemy.orm import Session
- from app.models.user import User
- from app.models.mapping import AppUserMapping
- from app.models.application import Application
- logger = logging.getLogger(__name__)
- class WebhookService:
- @staticmethod
- def _generate_signature(secret: str, payload: str, timestamp: int) -> str:
- """
- Generate X-UAP-Signature: HMAC-SHA256(secret, timestamp + "." + payload)
- """
- message = f"{timestamp}.{payload}"
- return hmac.new(
- secret.encode('utf-8'),
- message.encode('utf-8'),
- hashlib.sha256
- ).hexdigest()
- @staticmethod
- async def send_notification(url: str, secret: str, event_type: str, user_data: Dict[str, Any]):
- timestamp = int(time.time())
- payload_dict = {
- "event_type": event_type, # ADD, UPDATE, DISABLE
- "timestamp": timestamp,
- "data": user_data
- }
- payload_json = json.dumps(payload_dict)
- signature = WebhookService._generate_signature(secret, payload_json, timestamp)
- headers = {
- "Content-Type": "application/json",
- "X-UAP-Signature": signature,
- "X-UAP-Timestamp": str(timestamp)
- }
- async with httpx.AsyncClient() as client:
- try:
- response = await client.post(url, content=payload_json, headers=headers, timeout=5.0)
- response.raise_for_status()
- logger.info(f"Webhook sent to {url}. Status: {response.status_code}")
- except Exception as e:
- logger.error(f"Failed to send webhook to {url}: {str(e)}")
- @staticmethod
- async def trigger_user_event(db: Session, user_id: int, event_type: str):
- """
- Finds all apps mapped to this user and sends webhooks if configured.
- """
- user = db.query(User).filter(User.id == user_id).first()
- if not user:
- return
- mappings = db.query(AppUserMapping).filter(AppUserMapping.user_id == user_id).all()
-
- for mapping in mappings:
- app = mapping.application
- if app.notification_url:
- # Construct data specific to the app (e.g. including mapped_key)
- user_data = {
- "user_id": user.id,
- "mobile": user.mobile,
- "status": user.status,
- "mapped_key": mapping.mapped_key
- }
- # Note: app.app_secret is used for signing.
- # If app_secret is stored as hash, we technically CANNOT sign it securely
- # unless we stored the plain secret or a separate webhook secret.
- # Per previous step, we changed Application to store plain `app_secret`.
- await WebhookService.send_notification(
- app.notification_url,
- app.app_secret,
- event_type,
- user_data
- )
|