Преглед изворни кода

v1.0.2 修复消息延迟问题

liuq пре 2 месеци
родитељ
комит
f00ba59d7b
2 измењених фајлова са 207 додато и 31 уклоњено
  1. 19 11
      composables/useMessages.js
  2. 188 20
      composables/useWebSocket.js

+ 19 - 11
composables/useMessages.js

@@ -48,19 +48,27 @@ function normalizeMessageList(list, currentUserId) {
 	return (list || []).map((m) => normalizeMessage(m, currentUserId)).filter(Boolean)
 }
 
+/**
+ * 拉取某会话最新一页历史(与聊天页 fetchMessages 同源逻辑)。
+ * 供 WebSocket 重连后补拉断线期间消息,避免仅依赖推送延迟。
+ */
+export async function fetchMessagesForContact(contactId) {
+	const token = getToken()
+	if (!token) return
+	const currentUserId = getUserIdFromToken(token)
+	try {
+		const data = await getMessages(token, contactId, { skip: 0, limit: 50 })
+		const list = Array.isArray(data) ? data : (data.messages || data.list || data.items || [])
+		chatStore.setMessagesForContact(contactId, normalizeMessageList(list, currentUserId))
+		chatStore.markContactLoaded(contactId)
+	} catch (e) {
+		chatStore.setMessagesForContact(contactId, [])
+	}
+}
+
 export function useMessages() {
 	async function fetchMessages(contactId) {
-		const token = getToken()
-		if (!token) return
-		const currentUserId = getUserIdFromToken(token)
-		try {
-			const data = await getMessages(token, contactId, { skip: 0, limit: 50 })
-			const list = Array.isArray(data) ? data : (data.messages || data.list || data.items || [])
-			chatStore.setMessagesForContact(contactId, normalizeMessageList(list, currentUserId))
-			chatStore.markContactLoaded(contactId)
-		} catch (e) {
-			chatStore.setMessagesForContact(contactId, [])
-		}
+		return fetchMessagesForContact(contactId)
 	}
 
 	async function fetchMoreMessages(contactId) {

+ 188 - 20
composables/useWebSocket.js

@@ -1,22 +1,36 @@
 /**
- * 消息 WebSocket:登录后连接,收到新消息时更新 chatStore(消息列表、会话预览),不整表刷新
- * 未读角标由 GET /messages/unread-count 刷新,不在此本地累加
- * 断线后自动重连(指数退避);主动 disconnect 时不重连
+ * 消息 WebSocket:与后端 /api/v1/ws/messages 一致——文本 ping/pong、业务仅 { type:"NEW_MESSAGE", data }
+ * 收到其它 JSON(如接入层 control)直接忽略,不当作业务消息
+ * 未读角标由 GET /messages/unread-count 刷新;断线重连指数退避;主动 disconnect 不重连
+ * 重连成功且停留在某会话时补拉该会话历史(HTTP 兜底)
+ * 建连/重连后立即发 ping,规定时间内未收到 pong 则关连接并走自动重连(验证链路可用)
+ * 定时心跳 ping 后同样须在超时内收到 pong(或收到业务 NEW_MESSAGE 视为存活),否则关连接重连
  */
 import { getToken, markHistoryReadAll, normalizeMessageContentType } from '../utils/api'
 import { chatStore } from '../store/chat'
 import { fetchContactsList } from './useContacts'
 import { fetchUnreadCountAndUpdateTabBar } from './useUnreadBadge'
+import { fetchMessagesForContact } from './useMessages'
 
 const WS_BASE = 'wss://api.hnyunzhu.com/api/v1/ws/messages'
 const HEARTBEAT_INTERVAL = 30000
+/** 建连后首包 ping 须在此时长内收到 pong,否则视为假连接并关闭重连 */
+const VERIFY_PONG_TIMEOUT_MS = 8000
 const INITIAL_RECONNECT_DELAY = 1000
 const MAX_RECONNECT_DELAY = 30000
 
 let socketTask = null
 let heartbeatTimer = null
+let pongVerifyTimer = null
+/** 正在等待建连握手 pong,收到 pong 或超时后清零 */
+let awaitingHandshakePong = false
+let heartbeatPongTimer = null
+/** 定时心跳已发 ping,等待 pong(或业务帧视为存活) */
+let awaitingHeartbeatPong = false
 /** 仅 uni.onSocket* 注册一次,避免每次 connect 叠加监听 */
 let socketListenersAttached = false
+/** 网络恢复监听只注册一次 */
+let networkResumeAttached = false
 let intentionalClose = false
 let reconnectTimer = null
 let reconnectAttempt = 0
@@ -28,12 +42,101 @@ function clearReconnectTimer() {
 	}
 }
 
+function clearPongVerifyTimer() {
+	if (pongVerifyTimer) {
+		clearTimeout(pongVerifyTimer)
+		pongVerifyTimer = null
+	}
+}
+
+function clearHeartbeatPongTimer() {
+	if (heartbeatPongTimer) {
+		clearTimeout(heartbeatPongTimer)
+		heartbeatPongTimer = null
+	}
+}
+
+/** 收到业务推送也视为链路存活,避免仅因 pong 与消息顺序问题误杀连接 */
+function markHeartbeatAlive() {
+	clearHeartbeatPongTimer()
+	awaitingHeartbeatPong = false
+}
+
+/** 收到 pong:结束握手/心跳等待;并拉未读/会话(与原先一致) */
+function handlePong() {
+	if (awaitingHandshakePong) {
+		console.log('[WS] handshake ok (pong)')
+	}
+	clearPongVerifyTimer()
+	awaitingHandshakePong = false
+	markHeartbeatAlive()
+	syncInboxFromServer()
+}
+
+/** 定时心跳发出 ping 后启动超时,无 pong 则关连接重连 */
+function startHeartbeatPongWatch() {
+	clearHeartbeatPongTimer()
+	awaitingHeartbeatPong = true
+	heartbeatPongTimer = setTimeout(() => {
+		if (!awaitingHeartbeatPong || intentionalClose) return
+		console.warn('[WS] heartbeat: no pong in time, closing to reconnect')
+		awaitingHeartbeatPong = false
+		heartbeatPongTimer = null
+		try {
+			uni.closeSocket()
+		} catch (e) {}
+	}, HEARTBEAT_PONG_TIMEOUT_MS)
+}
+
+/**
+ * 建连成功后立即 ping,超时无 pong 则 closeSocket → onSocketClose → 自动重连
+ */
+function startHandshakeVerify() {
+	clearPongVerifyTimer()
+	awaitingHandshakePong = true
+	try {
+		if (socketTask) uni.sendSocketMessage({ data: 'ping' })
+	} catch (e) {
+		awaitingHandshakePong = false
+		return
+	}
+	pongVerifyTimer = setTimeout(() => {
+		if (!awaitingHandshakePong || intentionalClose) return
+		console.warn('[WS] verify: no pong in time, closing to reconnect')
+		awaitingHandshakePong = false
+		pongVerifyTimer = null
+		try {
+			uni.closeSocket()
+		} catch (e) {}
+	}, VERIFY_PONG_TIMEOUT_MS)
+}
+
+function attachNetworkResumeOnce() {
+	if (networkResumeAttached) return
+	networkResumeAttached = true
+	uni.onNetworkStatusChange((res) => {
+		if (!res.isConnected || !getToken() || intentionalClose) return
+		if (!socketTask) tryConnect()
+	})
+}
+
 /** WS 心跳成功或收到推送后:拉总未读 + 会话列表 */
 function syncInboxFromServer() {
 	if (!getToken()) return
 	Promise.all([fetchUnreadCountAndUpdateTabBar(), fetchContactsList()]).catch(() => {})
 }
 
+/** 建连/重连成功:若当前停留在某会话页,补拉最新一页历史;约 1.5s 后再拉一次兜底 */
+function refreshActiveThreadAfterSocketOpen() {
+	const cid = chatStore.activeContactId
+	if (!cid) return
+	fetchMessagesForContact(cid).catch(() => {})
+	setTimeout(() => {
+		if (String(chatStore.activeContactId || '') !== String(cid)) return
+		fetchMessagesForContact(cid).catch(() => {})
+	}, 1500)
+}
+
 function scheduleReconnect() {
 	if (intentionalClose) return
 	if (!getToken()) return
@@ -90,54 +193,110 @@ function getContactIdFromMessage(msg, currentUserId) {
 	return receiverId
 }
 
+/** 将 uni 回调里的 data 转为文本(兼容部分端返回 ArrayBuffer) */
+function socketDataToText(res) {
+	const d = res && res.data
+	if (typeof d === 'string') return d
+	if (typeof ArrayBuffer !== 'undefined' && d instanceof ArrayBuffer) {
+		try {
+			return new TextDecoder('utf-8').decode(d)
+		} catch (e) {
+			return ''
+		}
+	}
+	return ''
+}
+
+/**
+ * 解析一帧:pong 心跳 | JSON 对象(与后端 send_json 一致)
+ * @returns {{ kind: 'pong' } | { kind: 'json', data: object } | null}
+ */
+function parseSocketFrame(res) {
+	const d = res && res.data
+	if (d === 'pong' || (typeof d === 'string' && d.trim() === 'pong')) {
+		return { kind: 'pong' }
+	}
+	if (typeof d === 'string') {
+		const t = d.trim()
+		if (t === 'pong') return { kind: 'pong' }
+		if (!t) return null
+		return { kind: 'json', data: JSON.parse(t) }
+	}
+	if (typeof ArrayBuffer !== 'undefined' && d instanceof ArrayBuffer) {
+		const raw = new TextDecoder('utf-8').decode(d)
+		if (raw.trim() === 'pong') return { kind: 'pong' }
+		if (!raw.trim()) return null
+		return { kind: 'json', data: JSON.parse(raw) }
+	}
+	if (d && typeof d === 'object') {
+		return { kind: 'json', data: d }
+	}
+	return null
+}
+
 function attachSocketListenersOnce() {
 	if (socketListenersAttached) return
 	socketListenersAttached = true
+	attachNetworkResumeOnce()
 
 	uni.onSocketOpen(() => {
 		reconnectAttempt = 0
 		clearReconnectTimer()
+		clearPongVerifyTimer()
+		clearHeartbeatPongTimer()
+		awaitingHandshakePong = false
+		awaitingHeartbeatPong = false
 		console.log('[WS] messages connected')
 		syncInboxFromServer()
+		refreshActiveThreadAfterSocketOpen()
 		heartbeatTimer = setInterval(() => {
 			try {
-				if (socketTask) uni.sendSocketMessage({ data: 'ping' })
+				if (socketTask) {
+					uni.sendSocketMessage({ data: 'ping' })
+					startHeartbeatPongWatch()
+				}
 			} catch (e) {}
 		}, HEARTBEAT_INTERVAL)
+		startHandshakeVerify()
 	})
 
 	uni.onSocketMessage((res) => {
 		try {
-			// 心跳:服务端回复 pong
-			if (res.data === 'pong') {
-				syncInboxFromServer()
+			const parsed = parseSocketFrame(res)
+			if (!parsed) {
+				const t = socketDataToText(res)
+				if (t) console.warn('[WS] unsupported frame', t.slice(0, 120))
+				return
+			}
+			if (parsed.kind === 'pong') {
+				handlePong()
 				return
 			}
-			const data = typeof res.data === 'string' ? JSON.parse(res.data) : res.data
-			console.log('[WS] recv', data)
-			syncInboxFromServer()
-			// 文档格式:{ type: 'NEW_MESSAGE', data: { id, sender_id, receiver_id, ... } }
-			const msg = data.type === 'NEW_MESSAGE' ? data.data : (data.message ?? data)
-			if (!msg) {
-				console.log('[WS] recv (ignored: no message payload)')
+
+			const data = parsed.data
+			if (!data || typeof data !== 'object' || Array.isArray(data)) return
+			if (data.type !== 'NEW_MESSAGE' || data.data == null) {
+				if (typeof data.command !== 'undefined') {
+					console.log('[WS] recv (skip non-business)', `command=${data.command}`)
+				}
 				return
 			}
+
+			markHeartbeatAlive()
+
+			const msg = data.data
 			const normalized = normalizeWsMessage({ message: msg })
-			// 需要当前用户 id 判断会话方:若后端推送里带 current_user_id 用那个,否则用 receiver_id 判断
 			const currentUserId = data.current_user_id ?? normalized.receiverId
 			const contactId = getContactIdFromMessage(msg, currentUserId)
 			if (contactId == null) {
-				console.log('[WS] recv (ignored: no contactId)', { msg, currentUserId })
+				console.warn('[WS] NEW_MESSAGE (ignored: no contactId)', { msg, currentUserId })
 				return
 			}
 			{
 				const cid = String(contactId)
 				const list = chatStore.messages[cid] || []
 				const hasId = normalized.id && list.some((m) => String(m.id) === String(normalized.id))
-				if (hasId) {
-					console.log('[WS] recv (ignored: duplicate id)', String(normalized.id))
-					return
-				}
+				if (hasId) return
 				chatStore.appendMessage(cid, normalized)
 				// 前台 & 后台消息通知:若当前不在该会话,则给出提示
 				const isActive = String(chatStore.activeContactId || '') === String(contactId)
@@ -198,6 +357,7 @@ function attachSocketListenersOnce() {
 							: '[通知]'
 				} else preview = normalized.title || '[文件]'
 				chatStore.updateContactPreview(cid, { lastMessage: preview, time: normalized.createdAt })
+				syncInboxFromServer()
 			}
 		} catch (e) {
 			console.warn('[WS] parse message error', e, res && res.data)
@@ -209,6 +369,10 @@ function attachSocketListenersOnce() {
 	})
 
 	uni.onSocketClose(() => {
+		clearPongVerifyTimer()
+		clearHeartbeatPongTimer()
+		awaitingHandshakePong = false
+		awaitingHeartbeatPong = false
 		if (heartbeatTimer) {
 			clearInterval(heartbeatTimer)
 			heartbeatTimer = null
@@ -235,6 +399,10 @@ function tryConnect() {
 function performDisconnect() {
 	intentionalClose = true
 	clearReconnectTimer()
+	clearPongVerifyTimer()
+	clearHeartbeatPongTimer()
+	awaitingHandshakePong = false
+	awaitingHeartbeatPong = false
 	reconnectAttempt = 0
 	if (heartbeatTimer) {
 		clearInterval(heartbeatTimer)