Message_Integration_Guide.md 45 KB

消息中心对接指南

统一消息中心支持应用向用户发送系统通知,并支持用户通过 WebSocket 实时接收消息。本指南将指导您如何接入消息发送与推送服务。

1. 核心概念

  • Message (私信): 用户与用户之间的点对点消息。
  • Notification (通知): 系统或应用发送给用户的业务提醒,支持 SSO 跳转。
  • WebSocket: 客户端通过长连接实时接收推送。

1.1 接口权限说明

消息中心接口支持两种认证方式:用户认证(JWT Token)和应用认证(签名验证)。不同接口的权限如下:

接口 用户权限 应用权限 说明
POST /messages/ ✅ 仅 MESSAGE ✅ MESSAGE + NOTIFICATION + BROADCAST 用户只能发私信,应用可发通知和广播
GET /messages/conversations 仅用户可查询
GET /messages/history/{id} 仅用户可查询
GET /messages/unread-count 仅用户可查询
GET /messages/{id}/callback-url 仅用户可调用,获取通知回调 URL
PUT /messages/{id}/read 仅用户可操作
PUT /messages/read-all 仅用户可操作
DELETE /messages/{id} 仅用户可操作
POST /messages/upload 用户和应用都可上传

2. 用户登录认证 (Auth)

在对接消息中心之前,客户端(如 WebSocket)通常需要获取用户的访问令牌 (Token)。

2.1 用户登录 (OAuth2 表单)

标准 OAuth2 密码模式登录,适用于 Postman 或支持 OAuth2 的客户端。

  • 接口地址: POST https://api.hnyunzhu.com/api/v1/auth/login
  • Content-Type: application/x-www-form-urlencoded

请求参数 (Form Data):

字段 必填 说明
username 用户手机号
password 用户密码

响应示例 (JSON):

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR...",
  "token_type": "bearer"
}

2.2 用户登录 (JSON)

适用于前端 SPA 或移动端应用调用的 JSON 格式登录接口。

  • 接口地址: POST https://api.hnyunzhu.com/api/v1/auth/login/json
  • Content-Type: application/json

请求参数 (JSON):

{
  "mobile": "13800138000",
  "password": "your_password",
  "remember_me": false
}

响应示例 (JSON):

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR...",
  "token_type": "bearer"
}

3. 消息发送接口 (HTTP)

应用端通过 HTTP 接口向指定用户发送消息。

  • 接口地址: POST https://api.hnyunzhu.com/api/v1/messages/
  • 认证方式:
    • 应用调用 (Server-to-Server): 使用应用签名头信息。
    • 用户调用 (Client-to-Server): 使用 Bearer Token。

3.1 应用调用示例 (签名认证)

适用于业务系统后端向用户推送通知。签名生成规则请参考 API 安全规范 (简单来说:sign = HMAC-SHA256(secret, app_id=your_app_id_string&timestamp=1700000000))。

完整 HTTP 请求示例:

POST https://api.hnyunzhu.com/api/v1/messages/ HTTP/1.1
Host: api.yourdomain.com
Content-Type: application/json
X-App-Id: your_app_id_string
X-Timestamp: 1708848000
X-Sign: a1b2c3d4e5f6... (HMAC-SHA256签名)

{
  "app_id": "your_app_id_string",
  "app_user_id": "zhangsan_oa",
  "type": "NOTIFICATION",
  "content_type": "TEXT",
  "title": "OA审批提醒",
  "content": "您有一条新的报销单待审批",
  "auto_sso": true,
  "target_url": "http://oa.com/audit/123",
  "action_text": "立即处理"
}

3.2 用户调用示例 (Token 认证)

适用于用户在前端直接发送私信(如用户 A 发送给用户 B)。

完整 HTTP 请求示例:

POST https://api.hnyunzhu.com/api/v1/messages/ HTTP/1.1
Host: api.yourdomain.com
Content-Type: application/json
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR...

{
  "receiver_id": 2048,             // 接收用户 ID
  "type": "MESSAGE",               // 私信
  "content_type": "TEXT",
  "title": "私信",
  "content": "你好,请问这个流程怎么走?"
}

3.3 广播消息接口 (Broadcast)

广播消息接口允许应用向所有活跃用户发送系统通知。适用于系统公告、重要通知等需要全员推送的场景。

重要说明:

  • 仅支持系统通知:广播模式仅支持 type: "NOTIFICATION",不支持私信类型
  • 仅应用可调用:广播功能仅限应用通过签名认证调用,普通用户无权使用
  • 发送给所有活跃用户:系统会自动查找所有状态为 ACTIVE 且未删除的用户,为每个用户创建一条消息记录
  • 实时推送:所有在线用户会通过 WebSocket 实时收到推送

接口地址: POST https://api.hnyunzhu.com/api/v1/messages/

认证方式: 应用签名认证(必须使用应用签名头信息)

请求参数:

字段 类型 必填 说明
is_broadcast boolean 设置为 true 启用广播模式
type string 必须为 "NOTIFICATION"
title string 消息标题
content string 消息内容
content_type string 内容类型,默认为 "TEXT"
auto_sso boolean 是否启用 SSO 自动跳转,默认为 false
target_url string 目标业务页面 URL(当 auto_sso=true 时使用)
action_url string 自定义跳转链接(不使用 SSO 时使用)
action_text string 操作按钮文案

注意: 广播模式下,不需要提供 receiver_idapp_user_id,系统会自动向所有活跃用户发送。

完整 HTTP 请求示例 (应用签名认证):

POST https://api.hnyunzhu.com/api/v1/messages/ HTTP/1.1
Host: api.yourdomain.com
Content-Type: application/json
X-App-Id: your_app_id_string
X-Timestamp: 1708848000
X-Sign: a1b2c3d4e5f6... (HMAC-SHA256签名)

{
  "is_broadcast": true,
  "type": "NOTIFICATION",
  "content_type": "TEXT",
  "title": "系统维护通知",
  "content": "系统将于今晚 22:00-24:00 进行维护,期间可能无法访问,请提前做好准备。",
  "auto_sso": false,
  "action_text": "查看详情"
}

带 SSO 跳转的广播示例:

POST https://api.hnyunzhu.com/api/v1/messages/ HTTP/1.1
Host: api.yourdomain.com
Content-Type: application/json
X-App-Id: your_app_id_string
X-Timestamp: 1708848000
X-Sign: a1b2c3d4e5f6... (HMAC-SHA256签名)

{
  "is_broadcast": true,
  "type": "NOTIFICATION",
  "content_type": "TEXT",
  "title": "重要公告",
  "content": "请查看最新的系统更新说明",
  "auto_sso": true,
  "target_url": "http://your-app.com/announcements/2024-02",
  "action_text": "立即查看"
}

响应示例:

接口返回的是第一条创建的消息记录(用于签名验证),实际所有用户都会收到消息:

{
  "id": 1001,
  "sender_id": null,
  "receiver_id": 1,
  "type": "NOTIFICATION",
  "content_type": "TEXT",
  "title": "系统维护通知",
  "content": "系统将于今晚 22:00-24:00 进行维护...",
  "action_url": null,
  "action_text": "查看详情",
  "is_read": false,
  "created_at": "2026-02-25T10:00:00",
  "app_id": "your_app_id_string",
  "app_name": "您的应用名称"
}

Python 调用示例:

import requests
import time
import hmac
import hashlib

# 配置
API_URL = "https://api.hnyunzhu.com/api/v1/messages/"
APP_ID = "your_app_id_string"
APP_SECRET = "your_app_secret"

def generate_signature(app_id, app_secret):
    """生成签名"""
    timestamp = str(int(time.time()))
    params = {"app_id": app_id, "timestamp": timestamp}
    query_string = "&".join([f"{k}={params[k]}" for k in sorted(params.keys())])
    sign = hmac.new(app_secret.encode('utf-8'), query_string.encode('utf-8'), hashlib.sha256).hexdigest()
    return timestamp, sign

timestamp, sign = generate_signature(APP_ID, APP_SECRET)

headers = {
    "X-App-Id": APP_ID,
    "X-Timestamp": timestamp,
    "X-Sign": sign,
    "Content-Type": "application/json"
}

# 广播消息
payload = {
    "is_broadcast": True,  # 启用广播模式
    "type": "NOTIFICATION",  # 必须是 NOTIFICATION
    "content_type": "TEXT",
    "title": "系统维护通知",
    "content": "系统将于今晚 22:00-24:00 进行维护,期间可能无法访问。",
    "auto_sso": False,
    "action_text": "查看详情"
}

response = requests.post(API_URL, json=payload, headers=headers)
print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")

注意事项:

  1. 权限限制:只有通过应用签名认证的请求才能使用广播功能,普通用户 Token 认证无法使用
  2. 消息类型限制:广播仅支持 NOTIFICATION 类型,不支持 MESSAGE 类型
  3. 用户范围:广播会发送给所有状态为 ACTIVEis_deleted=0 的用户
  4. 性能考虑:如果用户数量很大,广播操作可能需要一些时间,建议在后台异步处理
  5. 实时推送:所有在线用户会通过 WebSocket 实时收到推送,离线用户可以在下次登录时查看
  6. 消息记录:每个用户都会收到一条独立的消息记录,可以单独标记已读或删除

3.4 如何获取接收者ID (receiver_id) 和应用ID (app_id)

在实际开发中,通常不会直接记住用户ID和应用ID,而是通过查询接口先找到对应对象,再取出其 ID。

3.3.1 通过用户查询接口获取 receiver_id

可通过用户搜索接口按手机号 / 姓名 / 英文名查询用户,然后从结果中读取 id 作为 receiver_id

  • 接口地址: GET https://api.hnyunzhu.com/api/v1/users/search?q=关键词
  • 说明: 仅返回未删除、状态为 ACTIVE 的用户,并自动排除当前操作者本人。

示例:

// 1. 先搜索用户 (按手机号 / 姓名 / 英文名)
GET https://api.hnyunzhu.com/api/v1/users/search?q=13800138000

// 2. 响应示例 (节选)
[
  {
    "id": 2048,
    "mobile": "13800138000",
    "name": "张三",
    "english_name": "zhangsan"
  }
]

// 3. 发送消息时使用 id 作为 receiver_id
POST https://api.hnyunzhu.com/api/v1/messages/
Content-Type: application/json
Authorization: Bearer xxx

{
  "receiver_id": 2048,
  "type": "MESSAGE",
  "content_type": "TEXT",
  "title": "私信",
  "content": "你好"
}

3.3.2 分页获取联系人列表

如果需要分页获取联系人列表(如消息中心选择联系人),可以使用以下接口:

接口1:用户搜索接口(推荐用于普通用户)

  • 接口地址: GET https://api.hnyunzhu.com/api/v1/users/search?q=关键词&limit=数量
  • 权限: 所有登录用户可用
  • 特点:
    • 支持关键词搜索(手机号、姓名、英文名)
    • limit 参数(默认20,可调整)
    • 不支持 skip 参数,无法跳过前面的记录
    • 只返回活跃用户(status == "ACTIVE")
    • 自动排除当前用户自己

请求示例:

// 搜索用户(不支持真正的分页)
GET https://api.hnyunzhu.com/api/v1/users/search?q=张三&limit=50
Authorization: Bearer xxx

// 响应示例
[
  {
    "id": 2048,
    "mobile": "13800138000",
    "name": "张三",
    "english_name": "zhangsan"
  },
  {
    "id": 2049,
    "mobile": "13900139000",
    "name": "张三丰",
    "english_name": "zhangsanfeng"
  }
]

接口2:用户列表接口(完整分页,需管理员权限)

  • 接口地址: GET https://api.hnyunzhu.com/api/v1/users/?skip=偏移量&limit=数量&keyword=关键词&status=状态
  • 权限: 仅超级管理员可用(普通用户会返回 403)
  • 特点:
    • 支持完整分页(skiplimit
    • 支持多种筛选条件(status, role, mobile, name, english_name, keyword)
    • 返回格式:{"total": 总数, "items": [用户列表]}

请求示例:

// 分页获取用户列表(需要超级管理员权限)
GET https://api.hnyunzhu.com/api/v1/users/?skip=0&limit=20&keyword=张三&status=ACTIVE
Authorization: Bearer xxx

// 响应示例
{
  "total": 100,
  "items": [
    {
      "id": 2048,
      "mobile": "13800138000",
      "name": "张三",
      "english_name": "zhangsan",
      "status": "ACTIVE"
    }
  ]
}

使用建议:

  • 普通用户场景:使用 GET /users/search 接口,通过 limit 参数控制返回数量(建议设置为 50-100)
  • 管理员场景:使用 GET /users/ 接口,支持完整的分页功能

JavaScript 示例:

// 方案1:普通用户 - 使用搜索接口(限制数量)
const fetchContacts = async (keyword = '', limit = 50) => {
  const res = await api.get('/users/search', {
    params: { q: keyword, limit }
  })
  return res.data
}

// 方案2:超级管理员 - 使用完整分页接口
const fetchContactsPaginated = async (page = 1, pageSize = 20, keyword = '') => {
  const res = await api.get('/users/', {
    params: {
      skip: (page - 1) * pageSize,
      limit: pageSize,
      keyword,
      status: 'ACTIVE'  // 只获取活跃用户
    }
  })
  return {
    users: res.data.items,
    total: res.data.total
  }
}

3.3.3 通过应用列表获取 app_id

如果是“用户调用 + 使用 app_user_id”的方式发送消息,需要在 Body 中同时提供 app_id,可以通过应用列表接口查询:

  • 接口地址: GET https://api.hnyunzhu.com/api/v1/apps/?search=关键字
  • 说明: 支持按应用名称 / app_id 模糊搜索,返回结构中既包含内部自增主键 id(整数),也包含对外使用的 app_id 字段(字符串类型,消息接口使用此字段)。

示例:

// 查询包含“OA”的应用
GET https://api.hnyunzhu.com/api/v1/apps/?search=OA

// 响应示例 (节选)
{
  "total": 1,
  "items": [
    {
      "id": 101,              // 数据库主键 (内部使用)
      "app_id": "oa_system",  // 字符串类型的应用ID (消息接口使用此字段)
      "app_name": "OA系统"
    }
  ]
}

// 用户以 app_user_id 方式发送消息时示例
POST https://api.hnyunzhu.com/api/v1/messages/
Content-Type: application/json
Authorization: Bearer xxx

{
  "app_id": "oa_system",       // 使用 items[0].app_id (字符串类型)
  "app_user_id": "zhangsan_oa",
  "type": "NOTIFICATION",
  "content_type": "TEXT",
  "title": "OA审批提醒",
  "content": "您有一条新的报销单待审批"
}

3.3.4 应用自调用时的 app_id 行为说明

  • 应用通过签名调用接口时:系统会自动根据 X-App-Id 解析出当前应用,并将 message_in.app_id 强制设置为该应用的字符串类型 app_id,Body 中传入的 app_id 会被忽略。
  • 用户调用并使用 app_user_idapp_id 必须在 Body 中显式给出(字符串类型),用于从 app_user_mapping 表中解析真实用户。

4. 消息查询接口

用户端通过以下接口查询和管理消息。

4.1 获取会话列表

获取当前用户的所有会话(类似微信首页的会话列表)。

  • 接口地址: GET https://api.hnyunzhu.com/api/v1/messages/conversations
  • 认证方式: Authorization: Bearer <JWT_TOKEN>
  • 权限: 仅用户可调用

响应示例:

[
  {
    "user_id": 0,
    "username": "System",
    "full_name": "系统通知",
    "unread_count": 5,
    "last_message": "您的密码已重置",
    "last_message_type": "TEXT",
    "updated_at": "2026-02-23T10:05:00"
  },
  {
    "user_id": 102,
    "username": "13800138000",
    "full_name": "李四",
    "unread_count": 0,
    "last_message": "[IMAGE]",
    "last_message_type": "IMAGE",
    "updated_at": "2026-02-22T18:30:00"
  }
]

说明:

  • user_id: 0 表示系统通知会话
  • unread_count 表示该会话的未读消息数
  • last_message 显示最后一条消息内容(多媒体类型显示为 [TYPE]

4.2 获取聊天历史记录

获取与特定用户的聊天记录(支持分页)。

  • 接口地址: GET https://api.hnyunzhu.com/api/v1/messages/history/{other_user_id}
  • 路径参数: other_user_id - 对方用户ID(0 表示系统通知)
  • 查询参数:
    • skip: 分页偏移(默认 0)
    • limit: 每页条数(默认 50)

请求示例:

GET https://api.hnyunzhu.com/api/v1/messages/history/123?skip=0&limit=50
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR...

响应示例:

[
  {
    "id": 501,
    "sender_id": 123,
    "receiver_id": 456,
    "type": "MESSAGE",
    "content_type": "TEXT",
    "title": "私信",
    "content": "你好,这是一条消息",
    "is_read": true,
    "created_at": "2026-02-23T10:00:00"
  }
]

4.3 获取未读消息数

获取当前用户的总未读消息数。

  • 接口地址: GET https://api.hnyunzhu.com/api/v1/messages/unread-count
  • 响应: 返回数字,表示未读消息总数

响应示例:

5

4.4 获取消息列表

获取当前用户的所有消息列表(支持分页和筛选)。

  • 接口地址: GET https://api.hnyunzhu.com/api/v1/messages/
  • 查询参数:
    • skip: 分页偏移(默认 0)
    • limit: 每页条数(默认 100)
    • unread_only: 是否只获取未读消息(默认 false)

请求示例:

GET https://api.hnyunzhu.com/api/v1/messages/?skip=0&limit=100&unread_only=false
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR...

5. 消息状态管理接口

用于标记消息已读、删除消息等操作。

5.1 标记单条消息已读

  • 接口地址: PUT https://api.hnyunzhu.com/api/v1/messages/{message_id}/read
  • 路径参数: message_id - 消息ID
  • 权限: 只能标记自己接收的消息为已读

请求示例:

PUT https://api.hnyunzhu.com/api/v1/messages/501/read
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR...

批量标记已读示例 (JavaScript):

// 获取未读消息ID列表
const unreadIds = messages
  .filter(m => !m.is_read && m.receiver_id === currentUserId)
  .map(m => m.id)

// 批量标记为已读
await Promise.all(
  unreadIds.map(id => api.put(`/messages/${id}/read`))
)

5.2 标记全部消息已读

  • 接口地址: PUT https://api.hnyunzhu.com/api/v1/messages/read-all
  • 响应: 返回更新的消息数量

请求示例:

PUT https://api.hnyunzhu.com/api/v1/messages/read-all
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR...

响应:

{
  "updated_count": 10
}

5.3 删除消息

  • 接口地址: DELETE https://api.hnyunzhu.com/api/v1/messages/{message_id}
  • 路径参数: message_id - 消息ID
  • 权限: 只能删除自己接收的消息

请求示例:

DELETE https://api.hnyunzhu.com/api/v1/messages/501
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR...

5.4 获取消息回调 URL

用于获取通知消息的 SSO 跳转回调 URL。当用户点击通知消息的操作按钮时,前端应调用此接口获取实时的 callback URL(包含 ticket),然后打开该 URL。

重要说明:

  • 此接口会实时生成 ticket,确保 ticket 不会过期
  • 只有消息的接收者可以调用此接口
  • 仅适用于带有 action_url 的通知消息

  • 接口地址: GET https://api.hnyunzhu.com/api/v1/messages/{message_id}/callback-url

  • 路径参数: message_id - 消息ID

  • 认证方式: Authorization: Bearer <JWT_TOKEN>

  • 权限: 仅用户可调用,且只能获取自己接收的消息

请求示例:

GET https://api.hnyunzhu.com/api/v1/messages/501/callback-url
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR...

响应示例:

{
  "callback_url": "http://your-app.com/callback?ticket=abc123&next=http://your-business.com/detail/123"
}

前端调用示例 (JavaScript):

// 处理通知操作按钮点击
const handleNotificationAction = async (message) => {
  if (message.action_url) {
    try {
      // 调用接口获取 callback URL(实时生成 ticket)
      const res = await api.get(`/messages/${message.id}/callback-url`)
      
      if (res.data && res.data.callback_url) {
        // 打开回调 URL
        window.open(res.data.callback_url, '_blank')
      } else {
        console.error('获取跳转链接失败')
      }
    } catch (error) {
      console.error('获取跳转链接失败:', error)
    }
  }
}

工作流程:

  1. 创建消息时:消息的 action_url 存储的是 jump 接口 URL(格式:/api/v1/simple/sso/jump?app_id=xxx&redirect_to=xxx),不包含 ticket
  2. 用户点击按钮时
    • 前端调用 GET /messages/{id}/callback-url 接口
    • 后端实时生成 ticket,并构造最终的 callback URL
    • 返回 JSON 格式的 callback_url
    • 前端打开返回的 callback URL

错误响应:

  • 404: 消息未找到
  • 403: 无权访问此消息(不是消息接收者)
  • 400: 消息没有配置跳转链接或跳转链接格式无效

注意事项:

  • Ticket 有效期为 60 秒,因此必须在用户点击时实时生成
  • 如果消息的 action_url 不是 jump 接口格式,将返回 400 错误
  • 此接口会验证用户身份,确保只有消息接收者可以获取 callback URL

6. 文件上传接口

用于上传图片、视频、文档等附件,上传成功后可用于发送多媒体消息。

  • 接口地址: POST https://api.hnyunzhu.com/api/v1/messages/upload
  • Content-Type: multipart/form-data
  • 认证方式: Authorization: Bearer <JWT_TOKEN>
  • 权限: 用户和应用都可调用

请求参数:

字段 类型 必填 说明
file File 上传的文件(支持图片、视频、文档等)

文件限制:

  • 最大文件大小: 50MB
  • 支持的文件类型: JPEG, PNG, GIF, WebP, MP4, PDF, DOC, DOCX, XLS, XLSX, TXT 等

请求示例 (JavaScript):

const formData = new FormData()
formData.append('file', file)

const uploadRes = await api.post('/messages/upload', formData, {
  headers: { 'Content-Type': 'multipart/form-data' }
})

// 响应示例
{
  "url": "https://minio.example.com/messages/1/2026/02/uuid.jpg",
  "key": "messages/1/2026/02/uuid.jpg",
  "filename": "image.jpg",
  "content_type": "image/jpeg",
  "size": 50200
}

上传后发送消息示例:

// 1. 先上传文件
const uploadRes = await api.post('/messages/upload', formData, {
  headers: { 'Content-Type': 'multipart/form-data' }
})

// 2. 使用返回的 key 发送消息
const payload = {
  receiver_id: 123,
  content: uploadRes.data.key,
  type: 'MESSAGE',
  content_type: 'IMAGE',
  title: '图片'
}

await api.post('/messages/', payload)

7. WebSocket 实时接入

前端客户端通过 WebSocket 连接接收实时推送。

  • 连接地址: ws://YOUR_DOMAIN/api/v1/ws/messages?token=JWT_TOKEN
  • HTTPS 环境: wss://YOUR_DOMAIN/api/v1/ws/messages?token=JWT_TOKEN
  • 心跳机制: 客户端每 30 秒发送 ping,服务端回复 pong
  • 断线重连: 建议客户端实现自动重连机制

连接示例 (JavaScript):

const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
const wsUrl = `${protocol}//${window.location.host}/api/v1/ws/messages?token=${localStorage.getItem('token')}`

const ws = new WebSocket(wsUrl)

// 连接成功
ws.onopen = () => {
  console.log('WebSocket 连接成功')
  
  // 启动心跳(每30秒发送一次)
  setInterval(() => {
    if (ws.readyState === WebSocket.OPEN) {
      ws.send('ping')
    }
  }, 30000)
}

// 接收消息
ws.onmessage = (event) => {
  // 心跳响应
  if (event.data === 'pong') {
    return
  }
  
  try {
    const msg = JSON.parse(event.data)
    if (msg.type === 'NEW_MESSAGE') {
      const newMessage = msg.data
      // 处理新消息
      console.log('收到新消息:', newMessage)
    }
  } catch (e) {
    console.error('解析消息失败:', e)
  }
}

// 连接关闭
ws.onclose = () => {
  console.log('WebSocket 连接关闭')
  // 实现重连逻辑
  setTimeout(() => {
    // 重新连接
  }, 3000)
}

// 连接错误
ws.onerror = (error) => {
  console.error('WebSocket 错误:', error)
}

推送消息格式 (Server -> Client):

{
  "type": "NEW_MESSAGE",
  "data": {
    "id": 1024,
    "sender_id": 102,
    "type": "NOTIFICATION",
    "content_type": "TEXT",
    "title": "OA审批提醒",
    "content": "您有一条新的报销单待审批",
    "action_url": "http://api.com/sso/jump?app_id=your_app_id_string&redirect_to=...",
    "action_text": "立即处理",
    "created_at": "2026-02-25T10:00:00"
  }
}

消息字段说明:

字段 说明
id 消息ID
sender_id 发送者ID(null 表示系统通知)
type 消息类型:MESSAGE(私信)或 NOTIFICATION(通知)
content_type 内容类型:TEXT, IMAGE, VIDEO, FILE
title 消息标题
content 消息内容(多媒体类型为预签名URL)
action_url 跳转链接(通知类型通常包含SSO跳转)
action_text 跳转按钮文案
receiver_id 接收者ID(前端用于判断消息归属)

7.1 WebSocket 连接建立与用户识别机制

7.1.1 连接建立流程

服务端识别用户的过程:

  1. 客户端连接时携带 JWT Token

    ws://host/api/v1/ws/messages?token=JWT_TOKEN
    
  2. 服务端验证 Token 并解析用户ID

    • 服务端从 Token 中解析出 user_id(当前登录用户的ID)
    • 验证用户是否存在且有效
  3. 将 WebSocket 连接与用户ID关联存储

    • 服务端使用 ConnectionManager 管理连接
    • 存储格式:{user_id: [WebSocket1, WebSocket2, ...]}
    • 一个用户可以有多个设备同时在线(手机、电脑、平板等)

服务端代码逻辑:

# 1. 验证 Token 并获取用户
user = await get_user_from_token(token, db)  # 从 Token 解析 user_id

# 2. 将连接与用户ID关联
await manager.connect(websocket, user.id)  # user.id 就是当前用户的ID

# 3. ConnectionManager 内部存储
# active_connections[user.id] = [websocket连接]

7.1.2 消息推送机制

服务端如何知道推送给哪个用户:

  1. 消息创建时确定接收者

    • 消息保存到数据库时,receiver_id 字段记录了接收者的用户ID
  2. 根据 receiver_id 查找连接

    • 服务端使用 receiver_id 作为 key,从 active_connections 中查找该用户的所有在线连接
  3. 向所有在线设备推送

    • 如果用户有多个设备在线,所有设备都会收到消息

服务端推送代码逻辑:

# 消息创建后,后台任务推送
background_tasks.add_task(
    manager.send_personal_message, 
    push_payload,           # 消息内容
    final_receiver_id       # 接收者的用户ID(从消息的 receiver_id 字段获取)
)

# ConnectionManager 根据 receiver_id 查找连接
async def send_personal_message(self, message: dict, user_id: int):
    """
    向特定用户的所有在线设备推送消息
    user_id 就是消息的 receiver_id
    """
    if user_id in self.active_connections:
        # 找到该用户的所有在线连接(可能多个设备)
        connections = self.active_connections[user_id][:]
        for connection in connections:
            await connection.send_json(message)  # 推送到每个设备

关键点:

  • 连接时:通过 Token 解析出当前用户的 user_id,将连接存储到 active_connections[user_id]
  • 推送时:使用消息的 receiver_id 作为 key,从 active_connections[receiver_id] 中查找连接并推送
  • 多设备支持:一个用户多个设备在线时,所有设备都会收到消息

7.2 前端接收消息与更新聊天窗口

7.2.1 消息接收处理

前端如何判断消息是否属于当前用户:

前端通过 WebSocket 接收到的消息中,服务端已经根据 receiver_id 进行了路由,所以收到的消息都是发给当前用户的。前端需要判断的是:

  1. 消息是否属于当前打开的聊天窗口
  2. 消息是别人发给我的,还是我自己从其他设备发送的

消息处理逻辑:

ws.onmessage = (event) => {
  if (event.data === 'pong') return  // 心跳响应,忽略
  
  try {
    const msg = JSON.parse(event.data)
    if (msg.type === 'NEW_MESSAGE') {
      const newMessage = msg.data
      handleNewMessage(newMessage)
    }
  } catch (e) {
    console.error('解析消息失败:', e)
  }
}

const handleNewMessage = (newMessage) => {
  const currentUserId = currentUserId.value  // 当前登录用户ID
  const currentChatId = currentChatId.value  // 当前打开的聊天窗口的用户ID
  
  // 情况1:收到的是当前聊天窗口的消息
  // - 对方发给我:sender_id !== currentUserId && 当前窗口是 sender_id
  // - 我发给对方(多设备同步):sender_id === currentUserId && 当前窗口是 receiver_id
  if (
    (newMessage.sender_id !== currentUserId && currentChatId === newMessage.sender_id) ||
    (newMessage.sender_id === currentUserId && currentChatId === newMessage.receiver_id)
  ) {
    // 直接添加到当前聊天窗口的消息列表
    messages.value.push(newMessage)
    scrollToBottom()  // 滚动到底部显示新消息
  }
  
  // 情况2:收到的是其他会话的消息
  // 更新会话列表的预览和未读数
  updateConversationPreview(
    newMessage.sender_id === currentUserId 
      ? newMessage.receiver_id   // 我发送的,更新接收者会话
      : newMessage.sender_id,     // 我接收的,更新发送者会话
    newMessage.content,
    newMessage.content_type
  )
  
  // 情况3:如果消息不是当前聊天窗口的,且是别人发给我的
  if (newMessage.sender_id !== currentUserId && currentChatId !== newMessage.sender_id) {
    // 增加未读数
    const conv = conversations.value.find(c => c.user_id === newMessage.sender_id)
    if (conv) {
      conv.unread_count = (conv.unread_count || 0) + 1
    }
  }
}

7.2.2 更新会话列表

更新会话列表的预览信息:

const updateConversationPreview = (userId, content, type) => {
  // 找到或创建会话
  let conv = conversations.value.find(c => c.user_id === userId)
  
  if (conv) {
    // 更新最后一条消息
    conv.last_message = type === 'TEXT' ? content : `[${type}]`
    conv.last_message_type = type
    conv.updated_at = new Date().toISOString()
    
    // 将会话移到最前面(最新消息在顶部)
    conversations.value = [
      conv, 
      ...conversations.value.filter(c => c.user_id !== userId)
    ]
  } else {
    // 新会话,重新获取会话列表
    fetchConversations()
  }
}

7.2.3 完整的前端消息处理示例

完整的消息中心实现:

// 初始化 WebSocket
const initWebSocket = () => {
  if (!currentUser.value) return
  
  const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
  const wsUrl = `${protocol}//${window.location.host}/api/v1/ws/messages?token=${localStorage.getItem('token')}`
  
  const ws = new WebSocket(wsUrl)
  
  // 连接成功
  ws.onopen = () => {
    console.log('WebSocket 连接成功')
    
    // 启动心跳(每30秒发送一次)
    setInterval(() => {
      if (ws.readyState === WebSocket.OPEN) {
        ws.send('ping')
      }
    }, 30000)
  }
  
  // 接收消息
  ws.onmessage = (event) => {
    if (event.data === 'pong') return
    
    try {
      const msg = JSON.parse(event.data)
      if (msg.type === 'NEW_MESSAGE') {
        handleNewMessage(msg.data)
      }
    } catch (e) {
      console.error('解析消息失败:', e)
    }
  }
  
  // 连接关闭
  ws.onclose = () => {
    console.log('WebSocket 连接关闭')
    // 实现重连逻辑
    setTimeout(() => {
      initWebSocket()  // 重新连接
    }, 3000)
  }
  
  // 连接错误
  ws.onerror = (error) => {
    console.error('WebSocket 错误:', error)
  }
}

// 处理新消息
const handleNewMessage = (newMessage) => {
  const currentUserId = currentUserId.value
  const currentChatId = currentChatId.value
  
  // 1. 判断是否属于当前聊天窗口
  const isCurrentChat = 
    (newMessage.sender_id !== currentUserId && currentChatId === newMessage.sender_id) ||
    (newMessage.sender_id === currentUserId && currentChatId === newMessage.receiver_id)
  
  if (isCurrentChat) {
    // 添加到当前窗口的消息列表
    messages.value.push(newMessage)
    scrollToBottom()
    
    // 如果是别人发给我的,标记为已读
    if (newMessage.sender_id !== currentUserId) {
      api.put(`/messages/${newMessage.id}/read`)
    }
  }
  
  // 2. 更新会话列表
  const otherUserId = newMessage.sender_id === currentUserId 
    ? newMessage.receiver_id 
    : newMessage.sender_id
  
  updateConversationPreview(
    otherUserId,
    newMessage.content,
    newMessage.content_type
  )
  
  // 3. 更新未读数(如果不是当前窗口且是别人发给我的)
  if (newMessage.sender_id !== currentUserId && !isCurrentChat) {
    const conv = conversations.value.find(c => c.user_id === newMessage.sender_id)
    if (conv) {
      conv.unread_count = (conv.unread_count || 0) + 1
    }
  }
}

// 更新会话预览
const updateConversationPreview = (userId, content, type) => {
  const conv = conversations.value.find(c => c.user_id === userId)
  
  if (conv) {
    conv.last_message = type === 'TEXT' ? content : `[${type}]`
    conv.last_message_type = type
    conv.updated_at = new Date().toISOString()
    
    // 移到最前面
    conversations.value = [
      conv,
      ...conversations.value.filter(c => c.user_id !== userId)
    ]
  } else {
    // 新会话,重新获取
    fetchConversations()
  }
}

// 滚动到底部
const scrollToBottom = () => {
  nextTick(() => {
    if (scrollContainer.value) {
      scrollContainer.value.scrollTop = scrollContainer.value.scrollHeight
    }
  })
}

7.3 完整流程图

┌─────────────────────────────────────────────────────────────┐
│ 1. 用户A登录 → 获取 JWT Token                               │
└─────────────────────────────────────────────────────────────┘
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 2. 建立 WebSocket 连接                                       │
│    ws://host/ws/messages?token=TOKEN                        │
└─────────────────────────────────────────────────────────────┘
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 3. 服务端验证 Token → 解析出 user_id = A                     │
└─────────────────────────────────────────────────────────────┘
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 4. 存储连接                                                  │
│    active_connections[A] = [WebSocket连接]                   │
└─────────────────────────────────────────────────────────────┘
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 5. 用户B发送消息给用户A                                      │
│    POST /messages/ { receiver_id: A, ... }                   │
└─────────────────────────────────────────────────────────────┘
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 6. 消息保存到数据库                                          │
│    receiver_id = A                                           │
└─────────────────────────────────────────────────────────────┘
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 7. 后台任务推送                                              │
│    manager.send_personal_message(payload, receiver_id=A)     │
└─────────────────────────────────────────────────────────────┘
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 8. 查找连接                                                  │
│    active_connections[A] → 找到用户A的所有连接               │
└─────────────────────────────────────────────────────────────┘
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 9. 推送消息                                                  │
│    用户A的所有设备都收到消息                                 │
└─────────────────────────────────────────────────────────────┘
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 10. 前端接收消息                                             │
│     ws.onmessage → handleNewMessage()                        │
└─────────────────────────────────────────────────────────────┘
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 11. 判断消息类型和当前窗口                                   │
│     - 是否属于当前聊天窗口?                                 │
│     - 是别人发给我的,还是我自己发的?                       │
└─────────────────────────────────────────────────────────────┘
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 12. 更新UI                                                   │
│     - 更新消息列表(如果当前窗口)                           │
│     - 更新会话列表(最后一条消息、未读数)                   │
│     - 滚动到底部(如果当前窗口)                             │
└─────────────────────────────────────────────────────────────┘

7.4 关键设计点总结

  1. 用户识别:连接时通过 JWT Token 解析 user_id,推送时使用 receiver_id 查找连接
  2. 多设备支持:一个 user_id 可以对应多个 WebSocket 连接,所有设备都会收到消息
  3. 消息路由:服务端根据 receiver_id 自动路由到正确的用户,前端只需判断是否属于当前窗口
  4. 实时更新:收到消息后自动更新消息列表、会话列表、未读数,无需手动刷新

8. 前端完整调用示例

以下示例展示前端如何完整地使用消息中心功能。

8.1 初始化消息中心

// 1. 页面加载时获取会话列表
onMounted(() => {
  fetchConversations()
})

// 2. 获取会话列表
const fetchConversations = async () => {
  try {
    const res = await api.get('/messages/conversations')
    conversations.value = res.data
    initWebSocket()  // 初始化 WebSocket
  } catch (e) {
    console.error('获取会话列表失败:', e)
  }
}

8.2 选择会话并加载历史消息

const selectChat = async (chat) => {
  currentChatId.value = chat.user_id
  await loadHistory(chat.user_id)
}

const loadHistory = async (userId) => {
  try {
    const res = await api.get(`/messages/history/${userId}`, {
      params: { skip: 0, limit: 50 }
    })
    messages.value = res.data.reverse()  // API返回最新在前,需要反转显示
    
    // 标记未读消息为已读
    const unreadIds = messages.value
      .filter(m => !m.is_read && m.receiver_id === currentUserId.value)
      .map(m => m.id)
    
    if (unreadIds.length > 0) {
      await Promise.all(unreadIds.map(id => api.put(`/messages/${id}/read`)))
    }
  } catch (e) {
    console.error('加载历史消息失败:', e)
  }
}

8.3 发送文本消息

const sendMessage = async () => {
  if (!inputMessage.value.trim() || !currentChatId.value) return
  
  const payload = {
    receiver_id: currentChatId.value,
    content: inputMessage.value,
    type: 'MESSAGE',
    content_type: 'TEXT',
    title: '私信'
  }
  
  try {
    const res = await api.post('/messages/', payload)
    messages.value.push(res.data)
    inputMessage.value = ''
  } catch (e) {
    ElMessage.error('发送失败')
  }
}

8.4 上传文件并发送

const handleUpload = async (options) => {
  const formData = new FormData()
  formData.append('file', options.file)
  
  try {
    // 1. 先上传文件
    const uploadRes = await api.post('/messages/upload', formData, {
      headers: { 'Content-Type': 'multipart/form-data' }
    })
    
    // 2. 再发送消息
    const payload = {
      receiver_id: currentChatId.value,
      content: uploadRes.data.key,
      type: 'MESSAGE',
      content_type: 'IMAGE',
      title: '图片'
    }
    
    const res = await api.post('/messages/', payload)
    messages.value.push(res.data)
  } catch (e) {
    ElMessage.error('上传失败')
  }
}

9. 调用示例 (Python)

以下示例展示如何使用 Python 发送通知。

import requests
import time
import hmac
import hashlib

# 配置
API_URL = "https://api.hnyunzhu.com/api/v1/messages/"
APP_ID = "your_app_id_string"  # 字符串类型的 app_id
APP_SECRET = "your_app_secret"

def generate_signature(app_id, app_secret):
    """生成签名"""
    timestamp = str(int(time.time()))
    params = {"app_id": app_id, "timestamp": timestamp}
    query_string = "&".join([f"{k}={params[k]}" for k in sorted(params.keys())])
    sign = hmac.new(app_secret.encode('utf-8'), query_string.encode('utf-8'), hashlib.sha256).hexdigest()
    return timestamp, sign

timestamp, sign = generate_signature(APP_ID, APP_SECRET)

# Debug: 打印签名信息
query_string = "&".join([f"{k}={v}" for k, v in sorted({"app_id": APP_ID, "timestamp": timestamp}.items())])
print(f"Debug - Query string for signature: {query_string}")
print(f"Debug - Generated signature: {sign}")
print(f"Debug - Timestamp: {timestamp}")

headers = {
    "X-App-Id": APP_ID,
    "X-Timestamp": timestamp,
    "X-Sign": sign,
    "Content-Type": "application/json"
}

payload = {
    "app_id": APP_ID,  
    "app_user_id": "admin",
    "type": "NOTIFICATION",
    "content_type": "TEXT",
    "title": "测试通知",
    "content": "这是一条测试通知消息2",
    "auto_sso": True,  # 开启自动 SSO
    "target_url": "http://your-business-system.com/detail/123",  # 最终要跳转的业务页面
    "action_text": "查看详情"
}

print(f"Debug - Request headers: {headers}")
print(f"Debug - Request payload: {payload}")

resp = requests.post(API_URL, json=payload, headers=headers)
print(f"Status: {resp.status_code}")
print(f"Response Headers: {dict(resp.headers)}")

# Safely handle response - check if it's JSON
try:
    if resp.text:
        print(f"Response Text: {resp.text}")
        try:
            print(f"Response JSON: {resp.json()}")
        except requests.exceptions.JSONDecodeError:
            print("Response is not valid JSON")
    else:
        print("Response body is empty")
except Exception as e:
    print(f"Error reading response: {e}")