统一消息中心支持应用向用户发送系统通知,并支持用户通过 WebSocket 实时接收消息。本指南将指导您如何接入消息发送与推送服务。
消息中心接口支持两种认证方式:用户认证(JWT Token)和应用认证(签名验证)。不同接口的权限如下:
| 接口 | 用户权限 | 应用权限 | 说明 |
|---|---|---|---|
POST /messages/ |
✅ 仅 MESSAGE | ✅ MESSAGE + NOTIFICATION + BROADCAST | 用户只能发私信,应用可发通知和广播 |
GET /messages/conversations |
✅ | ❌ | 仅用户可查询 |
GET /messages/history/{id} |
✅ | ❌ | 仅用户可查询 |
GET /messages/unread-count |
✅ | ❌ | 仅用户可查询 |
GET /messages/{id}/callback-url |
✅ | ❌ | 仅用户可调用,获取通知回调 URL |
PUT /messages/{id}/read |
✅ | ❌ | 仅用户可操作 |
PUT /messages/read-all |
✅ | ❌ | 仅用户可操作 |
DELETE /messages/{id} |
✅ | ❌ | 仅用户可操作 |
POST /messages/upload |
✅ | ✅ | 用户和应用都可上传 |
在对接消息中心之前,客户端(如 WebSocket)通常需要获取用户的访问令牌 (Token)。
标准 OAuth2 密码模式登录,适用于 Postman 或支持 OAuth2 的客户端。
POST https://api.hnyunzhu.com/api/v1/auth/loginapplication/x-www-form-urlencoded请求参数 (Form Data):
| 字段 | 必填 | 说明 |
|---|---|---|
username |
是 | 用户手机号 |
password |
是 | 用户密码 |
响应示例 (JSON):
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR...",
"token_type": "bearer"
}
适用于前端 SPA 或移动端应用调用的 JSON 格式登录接口。
POST https://api.hnyunzhu.com/api/v1/auth/login/jsonapplication/json请求参数 (JSON):
{
"mobile": "13800138000",
"password": "your_password",
"remember_me": false
}
响应示例 (JSON):
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR...",
"token_type": "bearer"
}
应用端通过 HTTP 接口向指定用户发送消息。
POST https://api.hnyunzhu.com/api/v1/messages/适用于业务系统后端向用户推送通知。签名生成规则请参考 API 安全规范 (简单来说:sign = HMAC-SHA256(secret, app_id=your_app_id_string×tamp=1700000000))。
完整 HTTP 请求示例:
POST https://api.hnyunzhu.com/api/v1/messages/ HTTP/1.1
Host: api.yourdomain.com
Content-Type: application/json
X-App-Id: your_app_id_string
X-Timestamp: 1708848000
X-Sign: a1b2c3d4e5f6... (HMAC-SHA256签名)
{
"app_id": "your_app_id_string",
"app_user_id": "zhangsan_oa",
"type": "NOTIFICATION",
"content_type": "TEXT",
"title": "OA审批提醒",
"content": "您有一条新的报销单待审批",
"auto_sso": true,
"target_url": "http://oa.com/audit/123",
"action_text": "立即处理"
}
适用于用户在前端直接发送私信(如用户 A 发送给用户 B)。
完整 HTTP 请求示例:
POST https://api.hnyunzhu.com/api/v1/messages/ HTTP/1.1
Host: api.yourdomain.com
Content-Type: application/json
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR...
{
"receiver_id": 2048, // 接收用户 ID
"type": "MESSAGE", // 私信
"content_type": "TEXT",
"title": "私信",
"content": "你好,请问这个流程怎么走?"
}
广播消息接口允许应用向所有活跃用户发送系统通知。适用于系统公告、重要通知等需要全员推送的场景。
重要说明:
type: "NOTIFICATION",不支持私信类型ACTIVE 且未删除的用户,为每个用户创建一条消息记录接口地址: POST https://api.hnyunzhu.com/api/v1/messages/
认证方式: 应用签名认证(必须使用应用签名头信息)
请求参数:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
is_broadcast |
boolean | 是 | 设置为 true 启用广播模式 |
type |
string | 是 | 必须为 "NOTIFICATION" |
title |
string | 是 | 消息标题 |
content |
string | 是 | 消息内容 |
content_type |
string | 否 | 内容类型,默认为 "TEXT" |
auto_sso |
boolean | 否 | 是否启用 SSO 自动跳转,默认为 false |
target_url |
string | 否 | 目标业务页面 URL(当 auto_sso=true 时使用) |
action_url |
string | 否 | 自定义跳转链接(不使用 SSO 时使用) |
action_text |
string | 否 | 操作按钮文案 |
注意: 广播模式下,不需要提供 receiver_id 或 app_user_id,系统会自动向所有活跃用户发送。
完整 HTTP 请求示例 (应用签名认证):
POST https://api.hnyunzhu.com/api/v1/messages/ HTTP/1.1
Host: api.yourdomain.com
Content-Type: application/json
X-App-Id: your_app_id_string
X-Timestamp: 1708848000
X-Sign: a1b2c3d4e5f6... (HMAC-SHA256签名)
{
"is_broadcast": true,
"type": "NOTIFICATION",
"content_type": "TEXT",
"title": "系统维护通知",
"content": "系统将于今晚 22:00-24:00 进行维护,期间可能无法访问,请提前做好准备。",
"auto_sso": false,
"action_text": "查看详情"
}
带 SSO 跳转的广播示例:
POST https://api.hnyunzhu.com/api/v1/messages/ HTTP/1.1
Host: api.yourdomain.com
Content-Type: application/json
X-App-Id: your_app_id_string
X-Timestamp: 1708848000
X-Sign: a1b2c3d4e5f6... (HMAC-SHA256签名)
{
"is_broadcast": true,
"type": "NOTIFICATION",
"content_type": "TEXT",
"title": "重要公告",
"content": "请查看最新的系统更新说明",
"auto_sso": true,
"target_url": "http://your-app.com/announcements/2024-02",
"action_text": "立即查看"
}
响应示例:
接口返回的是第一条创建的消息记录(用于签名验证),实际所有用户都会收到消息:
{
"id": 1001,
"sender_id": null,
"receiver_id": 1,
"type": "NOTIFICATION",
"content_type": "TEXT",
"title": "系统维护通知",
"content": "系统将于今晚 22:00-24:00 进行维护...",
"action_url": null,
"action_text": "查看详情",
"is_read": false,
"created_at": "2026-02-25T10:00:00",
"app_id": "your_app_id_string",
"app_name": "您的应用名称"
}
Python 调用示例:
import requests
import time
import hmac
import hashlib
# 配置
API_URL = "https://api.hnyunzhu.com/api/v1/messages/"
APP_ID = "your_app_id_string"
APP_SECRET = "your_app_secret"
def generate_signature(app_id, app_secret):
"""生成签名"""
timestamp = str(int(time.time()))
params = {"app_id": app_id, "timestamp": timestamp}
query_string = "&".join([f"{k}={params[k]}" for k in sorted(params.keys())])
sign = hmac.new(app_secret.encode('utf-8'), query_string.encode('utf-8'), hashlib.sha256).hexdigest()
return timestamp, sign
timestamp, sign = generate_signature(APP_ID, APP_SECRET)
headers = {
"X-App-Id": APP_ID,
"X-Timestamp": timestamp,
"X-Sign": sign,
"Content-Type": "application/json"
}
# 广播消息
payload = {
"is_broadcast": True, # 启用广播模式
"type": "NOTIFICATION", # 必须是 NOTIFICATION
"content_type": "TEXT",
"title": "系统维护通知",
"content": "系统将于今晚 22:00-24:00 进行维护,期间可能无法访问。",
"auto_sso": False,
"action_text": "查看详情"
}
response = requests.post(API_URL, json=payload, headers=headers)
print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")
注意事项:
NOTIFICATION 类型,不支持 MESSAGE 类型ACTIVE 且 is_deleted=0 的用户在实际开发中,通常不会直接记住用户ID和应用ID,而是通过查询接口先找到对应对象,再取出其 ID。
可通过用户搜索接口按手机号 / 姓名 / 英文名查询用户,然后从结果中读取 id 作为 receiver_id:
GET https://api.hnyunzhu.com/api/v1/users/search?q=关键词ACTIVE 的用户,并自动排除当前操作者本人。示例:
// 1. 先搜索用户 (按手机号 / 姓名 / 英文名)
GET https://api.hnyunzhu.com/api/v1/users/search?q=13800138000
// 2. 响应示例 (节选)
[
{
"id": 2048,
"mobile": "13800138000",
"name": "张三",
"english_name": "zhangsan"
}
]
// 3. 发送消息时使用 id 作为 receiver_id
POST https://api.hnyunzhu.com/api/v1/messages/
Content-Type: application/json
Authorization: Bearer xxx
{
"receiver_id": 2048,
"type": "MESSAGE",
"content_type": "TEXT",
"title": "私信",
"content": "你好"
}
如果需要分页获取联系人列表(如消息中心选择联系人),可以使用以下接口:
接口1:用户搜索接口(推荐用于普通用户)
GET https://api.hnyunzhu.com/api/v1/users/search?q=关键词&limit=数量limit 参数(默认20,可调整)skip 参数,无法跳过前面的记录请求示例:
// 搜索用户(不支持真正的分页)
GET https://api.hnyunzhu.com/api/v1/users/search?q=张三&limit=50
Authorization: Bearer xxx
// 响应示例
[
{
"id": 2048,
"mobile": "13800138000",
"name": "张三",
"english_name": "zhangsan"
},
{
"id": 2049,
"mobile": "13900139000",
"name": "张三丰",
"english_name": "zhangsanfeng"
}
]
接口2:用户列表接口(完整分页,需管理员权限)
GET https://api.hnyunzhu.com/api/v1/users/?skip=偏移量&limit=数量&keyword=关键词&status=状态skip 和 limit){"total": 总数, "items": [用户列表]}请求示例:
// 分页获取用户列表(需要超级管理员权限)
GET https://api.hnyunzhu.com/api/v1/users/?skip=0&limit=20&keyword=张三&status=ACTIVE
Authorization: Bearer xxx
// 响应示例
{
"total": 100,
"items": [
{
"id": 2048,
"mobile": "13800138000",
"name": "张三",
"english_name": "zhangsan",
"status": "ACTIVE"
}
]
}
使用建议:
GET /users/search 接口,通过 limit 参数控制返回数量(建议设置为 50-100)GET /users/ 接口,支持完整的分页功能JavaScript 示例:
// 方案1:普通用户 - 使用搜索接口(限制数量)
const fetchContacts = async (keyword = '', limit = 50) => {
const res = await api.get('/users/search', {
params: { q: keyword, limit }
})
return res.data
}
// 方案2:超级管理员 - 使用完整分页接口
const fetchContactsPaginated = async (page = 1, pageSize = 20, keyword = '') => {
const res = await api.get('/users/', {
params: {
skip: (page - 1) * pageSize,
limit: pageSize,
keyword,
status: 'ACTIVE' // 只获取活跃用户
}
})
return {
users: res.data.items,
total: res.data.total
}
}
如果是“用户调用 + 使用 app_user_id”的方式发送消息,需要在 Body 中同时提供 app_id,可以通过应用列表接口查询:
GET https://api.hnyunzhu.com/api/v1/apps/?search=关键字app_id 模糊搜索,返回结构中既包含内部自增主键 id(整数),也包含对外使用的 app_id 字段(字符串类型,消息接口使用此字段)。示例:
// 查询包含“OA”的应用
GET https://api.hnyunzhu.com/api/v1/apps/?search=OA
// 响应示例 (节选)
{
"total": 1,
"items": [
{
"id": 101, // 数据库主键 (内部使用)
"app_id": "oa_system", // 字符串类型的应用ID (消息接口使用此字段)
"app_name": "OA系统"
}
]
}
// 用户以 app_user_id 方式发送消息时示例
POST https://api.hnyunzhu.com/api/v1/messages/
Content-Type: application/json
Authorization: Bearer xxx
{
"app_id": "oa_system", // 使用 items[0].app_id (字符串类型)
"app_user_id": "zhangsan_oa",
"type": "NOTIFICATION",
"content_type": "TEXT",
"title": "OA审批提醒",
"content": "您有一条新的报销单待审批"
}
X-App-Id 解析出当前应用,并将 message_in.app_id 强制设置为该应用的字符串类型 app_id,Body 中传入的 app_id 会被忽略。app_user_id 时:app_id 必须在 Body 中显式给出(字符串类型),用于从 app_user_mapping 表中解析真实用户。用户端通过以下接口查询和管理消息。
获取当前用户的所有会话(类似微信首页的会话列表)。
GET https://api.hnyunzhu.com/api/v1/messages/conversationsAuthorization: Bearer <JWT_TOKEN>响应示例:
[
{
"user_id": 0,
"username": "System",
"full_name": "系统通知",
"unread_count": 5,
"last_message": "您的密码已重置",
"last_message_type": "TEXT",
"updated_at": "2026-02-23T10:05:00"
},
{
"user_id": 102,
"username": "13800138000",
"full_name": "李四",
"unread_count": 0,
"last_message": "[IMAGE]",
"last_message_type": "IMAGE",
"updated_at": "2026-02-22T18:30:00"
}
]
说明:
user_id: 0 表示系统通知会话unread_count 表示该会话的未读消息数last_message 显示最后一条消息内容(多媒体类型显示为 [TYPE])获取与特定用户的聊天记录(支持分页)。
GET https://api.hnyunzhu.com/api/v1/messages/history/{other_user_id}other_user_id - 对方用户ID(0 表示系统通知)skip: 分页偏移(默认 0)limit: 每页条数(默认 50)请求示例:
GET https://api.hnyunzhu.com/api/v1/messages/history/123?skip=0&limit=50
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR...
响应示例:
[
{
"id": 501,
"sender_id": 123,
"receiver_id": 456,
"type": "MESSAGE",
"content_type": "TEXT",
"title": "私信",
"content": "你好,这是一条消息",
"is_read": true,
"created_at": "2026-02-23T10:00:00"
}
]
获取当前用户的总未读消息数。
GET https://api.hnyunzhu.com/api/v1/messages/unread-count响应示例:
5
获取当前用户的所有消息列表(支持分页和筛选)。
GET https://api.hnyunzhu.com/api/v1/messages/skip: 分页偏移(默认 0)limit: 每页条数(默认 100)unread_only: 是否只获取未读消息(默认 false)请求示例:
GET https://api.hnyunzhu.com/api/v1/messages/?skip=0&limit=100&unread_only=false
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR...
用于标记消息已读、删除消息等操作。
PUT https://api.hnyunzhu.com/api/v1/messages/{message_id}/readmessage_id - 消息ID请求示例:
PUT https://api.hnyunzhu.com/api/v1/messages/501/read
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR...
批量标记已读示例 (JavaScript):
// 获取未读消息ID列表
const unreadIds = messages
.filter(m => !m.is_read && m.receiver_id === currentUserId)
.map(m => m.id)
// 批量标记为已读
await Promise.all(
unreadIds.map(id => api.put(`/messages/${id}/read`))
)
PUT https://api.hnyunzhu.com/api/v1/messages/read-all请求示例:
PUT https://api.hnyunzhu.com/api/v1/messages/read-all
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR...
响应:
{
"updated_count": 10
}
DELETE https://api.hnyunzhu.com/api/v1/messages/{message_id}message_id - 消息ID请求示例:
DELETE https://api.hnyunzhu.com/api/v1/messages/501
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR...
用于获取通知消息的 SSO 跳转回调 URL。当用户点击通知消息的操作按钮时,前端应调用此接口获取实时的 callback URL(包含 ticket),然后打开该 URL。
重要说明:
仅适用于带有 action_url 的通知消息
接口地址: GET https://api.hnyunzhu.com/api/v1/messages/{message_id}/callback-url
路径参数: message_id - 消息ID
认证方式: Authorization: Bearer <JWT_TOKEN>
权限: 仅用户可调用,且只能获取自己接收的消息
请求示例:
GET https://api.hnyunzhu.com/api/v1/messages/501/callback-url
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR...
响应示例:
{
"callback_url": "http://your-app.com/callback?ticket=abc123&next=http://your-business.com/detail/123"
}
前端调用示例 (JavaScript):
// 处理通知操作按钮点击
const handleNotificationAction = async (message) => {
if (message.action_url) {
try {
// 调用接口获取 callback URL(实时生成 ticket)
const res = await api.get(`/messages/${message.id}/callback-url`)
if (res.data && res.data.callback_url) {
// 打开回调 URL
window.open(res.data.callback_url, '_blank')
} else {
console.error('获取跳转链接失败')
}
} catch (error) {
console.error('获取跳转链接失败:', error)
}
}
}
工作流程:
action_url 存储的是 jump 接口 URL(格式:/api/v1/simple/sso/jump?app_id=xxx&redirect_to=xxx),不包含 ticketGET /messages/{id}/callback-url 接口callback_url错误响应:
404: 消息未找到403: 无权访问此消息(不是消息接收者)400: 消息没有配置跳转链接或跳转链接格式无效注意事项:
action_url 不是 jump 接口格式,将返回 400 错误用于上传图片、视频、文档等附件,上传成功后可用于发送多媒体消息。
POST https://api.hnyunzhu.com/api/v1/messages/uploadmultipart/form-dataAuthorization: Bearer <JWT_TOKEN>请求参数:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
file |
File | 是 | 上传的文件(支持图片、视频、文档等) |
文件限制:
请求示例 (JavaScript):
const formData = new FormData()
formData.append('file', file)
const uploadRes = await api.post('/messages/upload', formData, {
headers: { 'Content-Type': 'multipart/form-data' }
})
// 响应示例
{
"url": "https://minio.example.com/messages/1/2026/02/uuid.jpg",
"key": "messages/1/2026/02/uuid.jpg",
"filename": "image.jpg",
"content_type": "image/jpeg",
"size": 50200
}
上传后发送消息示例:
// 1. 先上传文件
const uploadRes = await api.post('/messages/upload', formData, {
headers: { 'Content-Type': 'multipart/form-data' }
})
// 2. 使用返回的 key 发送消息
const payload = {
receiver_id: 123,
content: uploadRes.data.key,
type: 'MESSAGE',
content_type: 'IMAGE',
title: '图片'
}
await api.post('/messages/', payload)
前端客户端通过 WebSocket 连接接收实时推送。
ws://YOUR_DOMAIN/api/v1/ws/messages?token=JWT_TOKENwss://YOUR_DOMAIN/api/v1/ws/messages?token=JWT_TOKENping,服务端回复 pong连接示例 (JavaScript):
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
const wsUrl = `${protocol}//${window.location.host}/api/v1/ws/messages?token=${localStorage.getItem('token')}`
const ws = new WebSocket(wsUrl)
// 连接成功
ws.onopen = () => {
console.log('WebSocket 连接成功')
// 启动心跳(每30秒发送一次)
setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send('ping')
}
}, 30000)
}
// 接收消息
ws.onmessage = (event) => {
// 心跳响应
if (event.data === 'pong') {
return
}
try {
const msg = JSON.parse(event.data)
if (msg.type === 'NEW_MESSAGE') {
const newMessage = msg.data
// 处理新消息
console.log('收到新消息:', newMessage)
}
} catch (e) {
console.error('解析消息失败:', e)
}
}
// 连接关闭
ws.onclose = () => {
console.log('WebSocket 连接关闭')
// 实现重连逻辑
setTimeout(() => {
// 重新连接
}, 3000)
}
// 连接错误
ws.onerror = (error) => {
console.error('WebSocket 错误:', error)
}
推送消息格式 (Server -> Client):
{
"type": "NEW_MESSAGE",
"data": {
"id": 1024,
"sender_id": 102,
"type": "NOTIFICATION",
"content_type": "TEXT",
"title": "OA审批提醒",
"content": "您有一条新的报销单待审批",
"action_url": "http://api.com/sso/jump?app_id=your_app_id_string&redirect_to=...",
"action_text": "立即处理",
"created_at": "2026-02-25T10:00:00"
}
}
消息字段说明:
| 字段 | 说明 |
|---|---|
id |
消息ID |
sender_id |
发送者ID(null 表示系统通知) |
type |
消息类型:MESSAGE(私信)或 NOTIFICATION(通知) |
content_type |
内容类型:TEXT, IMAGE, VIDEO, FILE |
title |
消息标题 |
content |
消息内容(多媒体类型为预签名URL) |
action_url |
跳转链接(通知类型通常包含SSO跳转) |
action_text |
跳转按钮文案 |
receiver_id |
接收者ID(前端用于判断消息归属) |
服务端识别用户的过程:
客户端连接时携带 JWT Token
ws://host/api/v1/ws/messages?token=JWT_TOKEN
服务端验证 Token 并解析用户ID
user_id(当前登录用户的ID)将 WebSocket 连接与用户ID关联存储
ConnectionManager 管理连接{user_id: [WebSocket1, WebSocket2, ...]}服务端代码逻辑:
# 1. 验证 Token 并获取用户
user = await get_user_from_token(token, db) # 从 Token 解析 user_id
# 2. 将连接与用户ID关联
await manager.connect(websocket, user.id) # user.id 就是当前用户的ID
# 3. ConnectionManager 内部存储
# active_connections[user.id] = [websocket连接]
服务端如何知道推送给哪个用户:
消息创建时确定接收者
receiver_id 字段记录了接收者的用户ID根据 receiver_id 查找连接
receiver_id 作为 key,从 active_connections 中查找该用户的所有在线连接向所有在线设备推送
服务端推送代码逻辑:
# 消息创建后,后台任务推送
background_tasks.add_task(
manager.send_personal_message,
push_payload, # 消息内容
final_receiver_id # 接收者的用户ID(从消息的 receiver_id 字段获取)
)
# ConnectionManager 根据 receiver_id 查找连接
async def send_personal_message(self, message: dict, user_id: int):
"""
向特定用户的所有在线设备推送消息
user_id 就是消息的 receiver_id
"""
if user_id in self.active_connections:
# 找到该用户的所有在线连接(可能多个设备)
connections = self.active_connections[user_id][:]
for connection in connections:
await connection.send_json(message) # 推送到每个设备
关键点:
user_id,将连接存储到 active_connections[user_id]receiver_id 作为 key,从 active_connections[receiver_id] 中查找连接并推送前端如何判断消息是否属于当前用户:
前端通过 WebSocket 接收到的消息中,服务端已经根据 receiver_id 进行了路由,所以收到的消息都是发给当前用户的。前端需要判断的是:
消息处理逻辑:
ws.onmessage = (event) => {
if (event.data === 'pong') return // 心跳响应,忽略
try {
const msg = JSON.parse(event.data)
if (msg.type === 'NEW_MESSAGE') {
const newMessage = msg.data
handleNewMessage(newMessage)
}
} catch (e) {
console.error('解析消息失败:', e)
}
}
const handleNewMessage = (newMessage) => {
const currentUserId = currentUserId.value // 当前登录用户ID
const currentChatId = currentChatId.value // 当前打开的聊天窗口的用户ID
// 情况1:收到的是当前聊天窗口的消息
// - 对方发给我:sender_id !== currentUserId && 当前窗口是 sender_id
// - 我发给对方(多设备同步):sender_id === currentUserId && 当前窗口是 receiver_id
if (
(newMessage.sender_id !== currentUserId && currentChatId === newMessage.sender_id) ||
(newMessage.sender_id === currentUserId && currentChatId === newMessage.receiver_id)
) {
// 直接添加到当前聊天窗口的消息列表
messages.value.push(newMessage)
scrollToBottom() // 滚动到底部显示新消息
}
// 情况2:收到的是其他会话的消息
// 更新会话列表的预览和未读数
updateConversationPreview(
newMessage.sender_id === currentUserId
? newMessage.receiver_id // 我发送的,更新接收者会话
: newMessage.sender_id, // 我接收的,更新发送者会话
newMessage.content,
newMessage.content_type
)
// 情况3:如果消息不是当前聊天窗口的,且是别人发给我的
if (newMessage.sender_id !== currentUserId && currentChatId !== newMessage.sender_id) {
// 增加未读数
const conv = conversations.value.find(c => c.user_id === newMessage.sender_id)
if (conv) {
conv.unread_count = (conv.unread_count || 0) + 1
}
}
}
更新会话列表的预览信息:
const updateConversationPreview = (userId, content, type) => {
// 找到或创建会话
let conv = conversations.value.find(c => c.user_id === userId)
if (conv) {
// 更新最后一条消息
conv.last_message = type === 'TEXT' ? content : `[${type}]`
conv.last_message_type = type
conv.updated_at = new Date().toISOString()
// 将会话移到最前面(最新消息在顶部)
conversations.value = [
conv,
...conversations.value.filter(c => c.user_id !== userId)
]
} else {
// 新会话,重新获取会话列表
fetchConversations()
}
}
完整的消息中心实现:
// 初始化 WebSocket
const initWebSocket = () => {
if (!currentUser.value) return
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
const wsUrl = `${protocol}//${window.location.host}/api/v1/ws/messages?token=${localStorage.getItem('token')}`
const ws = new WebSocket(wsUrl)
// 连接成功
ws.onopen = () => {
console.log('WebSocket 连接成功')
// 启动心跳(每30秒发送一次)
setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send('ping')
}
}, 30000)
}
// 接收消息
ws.onmessage = (event) => {
if (event.data === 'pong') return
try {
const msg = JSON.parse(event.data)
if (msg.type === 'NEW_MESSAGE') {
handleNewMessage(msg.data)
}
} catch (e) {
console.error('解析消息失败:', e)
}
}
// 连接关闭
ws.onclose = () => {
console.log('WebSocket 连接关闭')
// 实现重连逻辑
setTimeout(() => {
initWebSocket() // 重新连接
}, 3000)
}
// 连接错误
ws.onerror = (error) => {
console.error('WebSocket 错误:', error)
}
}
// 处理新消息
const handleNewMessage = (newMessage) => {
const currentUserId = currentUserId.value
const currentChatId = currentChatId.value
// 1. 判断是否属于当前聊天窗口
const isCurrentChat =
(newMessage.sender_id !== currentUserId && currentChatId === newMessage.sender_id) ||
(newMessage.sender_id === currentUserId && currentChatId === newMessage.receiver_id)
if (isCurrentChat) {
// 添加到当前窗口的消息列表
messages.value.push(newMessage)
scrollToBottom()
// 如果是别人发给我的,标记为已读
if (newMessage.sender_id !== currentUserId) {
api.put(`/messages/${newMessage.id}/read`)
}
}
// 2. 更新会话列表
const otherUserId = newMessage.sender_id === currentUserId
? newMessage.receiver_id
: newMessage.sender_id
updateConversationPreview(
otherUserId,
newMessage.content,
newMessage.content_type
)
// 3. 更新未读数(如果不是当前窗口且是别人发给我的)
if (newMessage.sender_id !== currentUserId && !isCurrentChat) {
const conv = conversations.value.find(c => c.user_id === newMessage.sender_id)
if (conv) {
conv.unread_count = (conv.unread_count || 0) + 1
}
}
}
// 更新会话预览
const updateConversationPreview = (userId, content, type) => {
const conv = conversations.value.find(c => c.user_id === userId)
if (conv) {
conv.last_message = type === 'TEXT' ? content : `[${type}]`
conv.last_message_type = type
conv.updated_at = new Date().toISOString()
// 移到最前面
conversations.value = [
conv,
...conversations.value.filter(c => c.user_id !== userId)
]
} else {
// 新会话,重新获取
fetchConversations()
}
}
// 滚动到底部
const scrollToBottom = () => {
nextTick(() => {
if (scrollContainer.value) {
scrollContainer.value.scrollTop = scrollContainer.value.scrollHeight
}
})
}
┌─────────────────────────────────────────────────────────────┐
│ 1. 用户A登录 → 获取 JWT Token │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 2. 建立 WebSocket 连接 │
│ ws://host/ws/messages?token=TOKEN │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 3. 服务端验证 Token → 解析出 user_id = A │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 4. 存储连接 │
│ active_connections[A] = [WebSocket连接] │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 5. 用户B发送消息给用户A │
│ POST /messages/ { receiver_id: A, ... } │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 6. 消息保存到数据库 │
│ receiver_id = A │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 7. 后台任务推送 │
│ manager.send_personal_message(payload, receiver_id=A) │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 8. 查找连接 │
│ active_connections[A] → 找到用户A的所有连接 │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 9. 推送消息 │
│ 用户A的所有设备都收到消息 │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 10. 前端接收消息 │
│ ws.onmessage → handleNewMessage() │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 11. 判断消息类型和当前窗口 │
│ - 是否属于当前聊天窗口? │
│ - 是别人发给我的,还是我自己发的? │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 12. 更新UI │
│ - 更新消息列表(如果当前窗口) │
│ - 更新会话列表(最后一条消息、未读数) │
│ - 滚动到底部(如果当前窗口) │
└─────────────────────────────────────────────────────────────┘
user_id,推送时使用 receiver_id 查找连接user_id 可以对应多个 WebSocket 连接,所有设备都会收到消息receiver_id 自动路由到正确的用户,前端只需判断是否属于当前窗口以下示例展示前端如何完整地使用消息中心功能。
// 1. 页面加载时获取会话列表
onMounted(() => {
fetchConversations()
})
// 2. 获取会话列表
const fetchConversations = async () => {
try {
const res = await api.get('/messages/conversations')
conversations.value = res.data
initWebSocket() // 初始化 WebSocket
} catch (e) {
console.error('获取会话列表失败:', e)
}
}
const selectChat = async (chat) => {
currentChatId.value = chat.user_id
await loadHistory(chat.user_id)
}
const loadHistory = async (userId) => {
try {
const res = await api.get(`/messages/history/${userId}`, {
params: { skip: 0, limit: 50 }
})
messages.value = res.data.reverse() // API返回最新在前,需要反转显示
// 标记未读消息为已读
const unreadIds = messages.value
.filter(m => !m.is_read && m.receiver_id === currentUserId.value)
.map(m => m.id)
if (unreadIds.length > 0) {
await Promise.all(unreadIds.map(id => api.put(`/messages/${id}/read`)))
}
} catch (e) {
console.error('加载历史消息失败:', e)
}
}
const sendMessage = async () => {
if (!inputMessage.value.trim() || !currentChatId.value) return
const payload = {
receiver_id: currentChatId.value,
content: inputMessage.value,
type: 'MESSAGE',
content_type: 'TEXT',
title: '私信'
}
try {
const res = await api.post('/messages/', payload)
messages.value.push(res.data)
inputMessage.value = ''
} catch (e) {
ElMessage.error('发送失败')
}
}
const handleUpload = async (options) => {
const formData = new FormData()
formData.append('file', options.file)
try {
// 1. 先上传文件
const uploadRes = await api.post('/messages/upload', formData, {
headers: { 'Content-Type': 'multipart/form-data' }
})
// 2. 再发送消息
const payload = {
receiver_id: currentChatId.value,
content: uploadRes.data.key,
type: 'MESSAGE',
content_type: 'IMAGE',
title: '图片'
}
const res = await api.post('/messages/', payload)
messages.value.push(res.data)
} catch (e) {
ElMessage.error('上传失败')
}
}
以下示例展示如何使用 Python 发送通知。
import requests
import time
import hmac
import hashlib
# 配置
API_URL = "https://api.hnyunzhu.com/api/v1/messages/"
APP_ID = "your_app_id_string" # 字符串类型的 app_id
APP_SECRET = "your_app_secret"
def generate_signature(app_id, app_secret):
"""生成签名"""
timestamp = str(int(time.time()))
params = {"app_id": app_id, "timestamp": timestamp}
query_string = "&".join([f"{k}={params[k]}" for k in sorted(params.keys())])
sign = hmac.new(app_secret.encode('utf-8'), query_string.encode('utf-8'), hashlib.sha256).hexdigest()
return timestamp, sign
timestamp, sign = generate_signature(APP_ID, APP_SECRET)
# Debug: 打印签名信息
query_string = "&".join([f"{k}={v}" for k, v in sorted({"app_id": APP_ID, "timestamp": timestamp}.items())])
print(f"Debug - Query string for signature: {query_string}")
print(f"Debug - Generated signature: {sign}")
print(f"Debug - Timestamp: {timestamp}")
headers = {
"X-App-Id": APP_ID,
"X-Timestamp": timestamp,
"X-Sign": sign,
"Content-Type": "application/json"
}
payload = {
"app_id": APP_ID,
"app_user_id": "admin",
"type": "NOTIFICATION",
"content_type": "TEXT",
"title": "测试通知",
"content": "这是一条测试通知消息2",
"auto_sso": True, # 开启自动 SSO
"target_url": "http://your-business-system.com/detail/123", # 最终要跳转的业务页面
"action_text": "查看详情"
}
print(f"Debug - Request headers: {headers}")
print(f"Debug - Request payload: {payload}")
resp = requests.post(API_URL, json=payload, headers=headers)
print(f"Status: {resp.status_code}")
print(f"Response Headers: {dict(resp.headers)}")
# Safely handle response - check if it's JSON
try:
if resp.text:
print(f"Response Text: {resp.text}")
try:
print(f"Response JSON: {resp.json()}")
except requests.exceptions.JSONDecodeError:
print("Response is not valid JSON")
else:
print("Response body is empty")
except Exception as e:
print(f"Error reading response: {e}")