|
|
@@ -0,0 +1,77 @@
|
|
|
+"""
|
|
|
+统一登录平台 (UAP) 消息推送服务
|
|
|
+用于向运维人员发送设备自检等通知消息
|
|
|
+"""
|
|
|
+import hmac
|
|
|
+import hashlib
|
|
|
+import time
|
|
|
+from typing import List
|
|
|
+
|
|
|
+import requests
|
|
|
+from utils.logger_config import logger
|
|
|
+
|
|
|
+# 复用 auth 中的 SSO 配置
|
|
|
+from api.auth import SSO_APP_ID, SSO_APP_SECRET
|
|
|
+
|
|
|
+UAP_MESSAGES_URL = 'https://api.hnyunzhu.com/api/v1/messages/'
|
|
|
+
|
|
|
+# 运维人员账号列表(统一登录平台 app_user_id)
|
|
|
+OPS_USER_IDS: List[str] = ['liuq', 'longb']
|
|
|
+
|
|
|
+# 自检页面链接(生产环境需配置为实际域名)
|
|
|
+SELF_CHECK_BASE_URL = 'http://localhost:5050'
|
|
|
+
|
|
|
+
|
|
|
+def _generate_signature(secret: str, params: dict) -> str:
|
|
|
+ """生成 HMAC-SHA256 签名(与 auth 中规则一致)"""
|
|
|
+ data = {k: v for k, v in params.items() if k != "sign" and v is not None}
|
|
|
+ sorted_keys = sorted(data.keys())
|
|
|
+ query_string = "&".join([f"{k}={data[k]}" for k in sorted_keys])
|
|
|
+ return hmac.new(
|
|
|
+ secret.encode('utf-8'),
|
|
|
+ query_string.encode('utf-8'),
|
|
|
+ hashlib.sha256
|
|
|
+ ).hexdigest()
|
|
|
+
|
|
|
+
|
|
|
+def send_self_check_notification(content: str, target_url: str = None) -> None:
|
|
|
+ """
|
|
|
+ 向运维人员发送自检通知消息
|
|
|
+ :param content: 消息内容(如:总设备数 X,在线 Y,离线 Z)
|
|
|
+ :param target_url: 点击跳转地址,默认 /self_check
|
|
|
+ """
|
|
|
+ if target_url is None:
|
|
|
+ target_url = f"{SELF_CHECK_BASE_URL}/self_check"
|
|
|
+
|
|
|
+ timestamp = int(time.time())
|
|
|
+ params = {"app_id": SSO_APP_ID, "timestamp": timestamp}
|
|
|
+ sign = _generate_signature(SSO_APP_SECRET, params)
|
|
|
+
|
|
|
+ headers = {
|
|
|
+ "Content-Type": "application/json",
|
|
|
+ "X-App-Id": SSO_APP_ID,
|
|
|
+ "X-Timestamp": str(timestamp),
|
|
|
+ "X-Sign": sign,
|
|
|
+ }
|
|
|
+
|
|
|
+ payload = {
|
|
|
+ "app_id": SSO_APP_ID,
|
|
|
+ "type": "NOTIFICATION",
|
|
|
+ "content_type": "TEXT",
|
|
|
+ "title": "韫珠展厅设备每日自检信息",
|
|
|
+ "content": content,
|
|
|
+ "auto_sso": True,
|
|
|
+ "target_url": target_url,
|
|
|
+ "action_text": "前往查看",
|
|
|
+ }
|
|
|
+
|
|
|
+ for app_user_id in OPS_USER_IDS:
|
|
|
+ try:
|
|
|
+ body = {**payload, "app_user_id": app_user_id}
|
|
|
+ resp = requests.post(UAP_MESSAGES_URL, json=body, headers=headers, timeout=10)
|
|
|
+ if resp.ok:
|
|
|
+ logger.info(f"[UAP] 自检通知已发送至 {app_user_id}")
|
|
|
+ else:
|
|
|
+ logger.warning(f"[UAP] 发送通知失败 {app_user_id}: status={resp.status_code}, body={resp.text}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"[UAP] 发送通知异常 {app_user_id}: {e}")
|