/** * 消息 WebSocket:与后端 /api/v1/ws/messages 一致——文本 ping/pong、业务仅 { type:"NEW_MESSAGE", data } * 收到其它 JSON(如接入层 control)直接忽略,不当作业务消息 * 未读角标由 GET /messages/unread-count 刷新;断线重连指数退避;主动 disconnect 不重连 * 重连成功且停留在某会话时补拉该会话历史(HTTP 兜底) * 建连/重连后立即发 ping,规定时间内未收到 pong 则关连接并走自动重连(验证链路可用) * 定时心跳 ping 后同样须在超时内收到 pong(或收到业务 NEW_MESSAGE 视为存活),否则关连接重连 */ import { getToken, markHistoryReadAll, normalizeMessageContentType } from '../utils/api' import { chatStore } from '../store/chat' import { fetchContactsList } from './useContacts' import { fetchUnreadCountAndUpdateTabBar } from './useUnreadBadge' import { fetchMessagesForContact } from './useMessages' const WS_BASE = 'wss://api.hnyunzhu.com/api/v1/ws/messages' const HEARTBEAT_INTERVAL = 30000 /** 建连后首包 ping 须在此时长内收到 pong,否则视为假连接并关闭重连 */ const VERIFY_PONG_TIMEOUT_MS = 8000 const INITIAL_RECONNECT_DELAY = 1000 const MAX_RECONNECT_DELAY = 30000 let socketTask = null let heartbeatTimer = null let pongVerifyTimer = null /** 正在等待建连握手 pong,收到 pong 或超时后清零 */ let awaitingHandshakePong = false let heartbeatPongTimer = null /** 定时心跳已发 ping,等待 pong(或业务帧视为存活) */ let awaitingHeartbeatPong = false /** 仅 uni.onSocket* 注册一次,避免每次 connect 叠加监听 */ let socketListenersAttached = false /** 网络恢复监听只注册一次 */ let networkResumeAttached = false let intentionalClose = false let reconnectTimer = null let reconnectAttempt = 0 function clearReconnectTimer() { if (reconnectTimer) { clearTimeout(reconnectTimer) reconnectTimer = null } } function clearPongVerifyTimer() { if (pongVerifyTimer) { clearTimeout(pongVerifyTimer) pongVerifyTimer = null } } function clearHeartbeatPongTimer() { if (heartbeatPongTimer) { clearTimeout(heartbeatPongTimer) heartbeatPongTimer = null } } /** 收到业务推送也视为链路存活,避免仅因 pong 与消息顺序问题误杀连接 */ function markHeartbeatAlive() { clearHeartbeatPongTimer() awaitingHeartbeatPong = false } /** 收到 pong:结束握手/心跳等待;并拉未读/会话(与原先一致) */ function handlePong() { if (awaitingHandshakePong) { console.log('[WS] handshake ok (pong)') } clearPongVerifyTimer() awaitingHandshakePong = false markHeartbeatAlive() syncInboxFromServer() } /** 定时心跳发出 ping 后启动超时,无 pong 则关连接重连 */ function startHeartbeatPongWatch() { clearHeartbeatPongTimer() awaitingHeartbeatPong = true heartbeatPongTimer = setTimeout(() => { if (!awaitingHeartbeatPong || intentionalClose) return console.warn('[WS] heartbeat: no pong in time, closing to reconnect') awaitingHeartbeatPong = false heartbeatPongTimer = null try { uni.closeSocket() } catch (e) {} }, HEARTBEAT_PONG_TIMEOUT_MS) } /** * 建连成功后立即 ping,超时无 pong 则 closeSocket → onSocketClose → 自动重连 */ function startHandshakeVerify() { clearPongVerifyTimer() awaitingHandshakePong = true try { if (socketTask) uni.sendSocketMessage({ data: 'ping' }) } catch (e) { awaitingHandshakePong = false return } pongVerifyTimer = setTimeout(() => { if (!awaitingHandshakePong || intentionalClose) return console.warn('[WS] verify: no pong in time, closing to reconnect') awaitingHandshakePong = false pongVerifyTimer = null try { uni.closeSocket() } catch (e) {} }, VERIFY_PONG_TIMEOUT_MS) } function attachNetworkResumeOnce() { if (networkResumeAttached) return networkResumeAttached = true uni.onNetworkStatusChange((res) => { if (!res.isConnected || !getToken() || intentionalClose) return if (!socketTask) tryConnect() }) } /** WS 心跳成功或收到推送后:拉总未读 + 会话列表 */ function syncInboxFromServer() { if (!getToken()) return Promise.all([fetchUnreadCountAndUpdateTabBar(), fetchContactsList()]).catch(() => {}) } /** 建连/重连成功:若当前停留在某会话页,补拉最新一页历史;约 1.5s 后再拉一次兜底 */ function refreshActiveThreadAfterSocketOpen() { const cid = chatStore.activeContactId if (!cid) return fetchMessagesForContact(cid).catch(() => {}) setTimeout(() => { if (String(chatStore.activeContactId || '') !== String(cid)) return fetchMessagesForContact(cid).catch(() => {}) }, 1500) } function scheduleReconnect() { if (intentionalClose) return if (!getToken()) return clearReconnectTimer() const delay = Math.min( INITIAL_RECONNECT_DELAY * Math.pow(2, reconnectAttempt), MAX_RECONNECT_DELAY ) reconnectAttempt += 1 console.log(`[WS] reconnect scheduled in ${delay}ms (attempt ${reconnectAttempt})`) reconnectTimer = setTimeout(() => { reconnectTimer = null tryConnect() }, delay) } function normalizeWsMessage(raw) { const m = raw.message || raw const type = m.type ?? 'MESSAGE' // 通知类消息:应用发给用户,当前用户始终为接收方 const isMe = type === 'NOTIFICATION' ? false : (m.is_me ?? m.isMe) const rawText = m.content ?? m.text ?? '' const urlField = m.url ?? m.file_url ?? m.content_url const content = urlField && /^https?:\/\//i.test(String(urlField)) ? String(urlField) : String(rawText) return { id: String(m.id), type, senderId: m.sender_id ?? m.senderId, receiverId: m.receiver_id ?? m.receiverId, content, contentType: normalizeMessageContentType(m.content_type ?? m.contentType ?? 'TEXT'), title: m.title, createdAt: m.created_at ?? m.createdAt, isMe, actionUrl: m.action_url ?? m.actionUrl, actionText: m.action_text ?? m.actionText } } /** * 根据推送消息计算会话 ID:私信用 sender_id/receiver_id,通知用负的 app_id */ function getContactIdFromMessage(msg, currentUserId) { const type = msg.type ?? 'MESSAGE' const appId = msg.app_id ?? msg.appId if (type === 'NOTIFICATION' && appId != null && appId !== '') { return -Math.abs(Number(appId)) } const senderId = msg.sender_id ?? msg.senderId const receiverId = msg.receiver_id ?? msg.receiverId // 当前用户是接收方则会话对方是 sender_id,否则是 receiver_id if (String(receiverId) === String(currentUserId)) return senderId return receiverId } /** 将 uni 回调里的 data 转为文本(兼容部分端返回 ArrayBuffer) */ function socketDataToText(res) { const d = res && res.data if (typeof d === 'string') return d if (typeof ArrayBuffer !== 'undefined' && d instanceof ArrayBuffer) { try { return new TextDecoder('utf-8').decode(d) } catch (e) { return '' } } return '' } /** * 解析一帧:pong 心跳 | JSON 对象(与后端 send_json 一致) * @returns {{ kind: 'pong' } | { kind: 'json', data: object } | null} */ function parseSocketFrame(res) { const d = res && res.data if (d === 'pong' || (typeof d === 'string' && d.trim() === 'pong')) { return { kind: 'pong' } } if (typeof d === 'string') { const t = d.trim() if (t === 'pong') return { kind: 'pong' } if (!t) return null return { kind: 'json', data: JSON.parse(t) } } if (typeof ArrayBuffer !== 'undefined' && d instanceof ArrayBuffer) { const raw = new TextDecoder('utf-8').decode(d) if (raw.trim() === 'pong') return { kind: 'pong' } if (!raw.trim()) return null return { kind: 'json', data: JSON.parse(raw) } } if (d && typeof d === 'object') { return { kind: 'json', data: d } } return null } function attachSocketListenersOnce() { if (socketListenersAttached) return socketListenersAttached = true attachNetworkResumeOnce() uni.onSocketOpen(() => { reconnectAttempt = 0 clearReconnectTimer() clearPongVerifyTimer() clearHeartbeatPongTimer() awaitingHandshakePong = false awaitingHeartbeatPong = false console.log('[WS] messages connected') syncInboxFromServer() refreshActiveThreadAfterSocketOpen() heartbeatTimer = setInterval(() => { try { if (socketTask) { uni.sendSocketMessage({ data: 'ping' }) startHeartbeatPongWatch() } } catch (e) {} }, HEARTBEAT_INTERVAL) startHandshakeVerify() }) uni.onSocketMessage((res) => { try { const parsed = parseSocketFrame(res) if (!parsed) { const t = socketDataToText(res) if (t) console.warn('[WS] unsupported frame', t.slice(0, 120)) return } if (parsed.kind === 'pong') { handlePong() return } const data = parsed.data if (!data || typeof data !== 'object' || Array.isArray(data)) return if (data.type !== 'NEW_MESSAGE' || data.data == null) { if (typeof data.command !== 'undefined') { console.log('[WS] recv (skip non-business)', `command=${data.command}`) } return } markHeartbeatAlive() const msg = data.data const normalized = normalizeWsMessage({ message: msg }) const currentUserId = data.current_user_id ?? normalized.receiverId const contactId = getContactIdFromMessage(msg, currentUserId) if (contactId == null) { console.warn('[WS] NEW_MESSAGE (ignored: no contactId)', { msg, currentUserId }) return } { const cid = String(contactId) const list = chatStore.messages[cid] || [] const hasId = normalized.id && list.some((m) => String(m.id) === String(normalized.id)) if (hasId) return chatStore.appendMessage(cid, normalized) // 前台 & 后台消息通知:若当前不在该会话,则给出提示 const isActive = String(chatStore.activeContactId || '') === String(contactId) if (!isActive) { const contact = (chatStore.contacts || []).find( (c) => String(c.user_id || c.id) === String(contactId) ) const title = (contact && contact.title) || '新消息' let body = '' if (normalized.contentType === 'TEXT') { body = normalized.content ? String(normalized.content).slice(0, 50) : '' } else if (normalized.contentType === 'IMAGE') { body = '[图片]' } else if (normalized.contentType === 'VIDEO') { body = '[视频]' } else if (normalized.contentType === 'USER_NOTIFICATION') { body = normalized.title ? String(normalized.title).slice(0, 50) : normalized.content ? String(normalized.content).slice(0, 50) : '[通知]' } else { body = normalized.title || '[文件]' } // #ifdef APP-PLUS try { plus.push.createMessage(body || '您有一条新消息', { contactId }, { title }) } catch (e) { // 兜底为 Toast uni.showToast({ title: `${title}: ${body || '新消息'}`, icon: 'none' }) } // #endif // #ifndef APP-PLUS uni.showToast({ title: `${title}: ${body || '新消息'}`, icon: 'none' }) // #endif } else { // 正在该会话对话框内收到新消息:标记本会话全部已读,并刷新未读与列表 const t = getToken() if (t) { Promise.resolve(markHistoryReadAll(t, cid)) .then(() => Promise.all([fetchUnreadCountAndUpdateTabBar(), fetchContactsList()]) ) .catch(() => {}) } } // 只更新该会话在列表中的最后一条预览与时间,不整表刷新 let preview = '' if (normalized.contentType === 'TEXT') { preview = normalized.content ? String(normalized.content).slice(0, 50) : '' } else if (normalized.contentType === 'IMAGE') preview = '[图片]' else if (normalized.contentType === 'VIDEO') preview = '[视频]' else if (normalized.contentType === 'USER_NOTIFICATION') { preview = normalized.title ? String(normalized.title).slice(0, 50) : normalized.content ? String(normalized.content).slice(0, 50) : '[通知]' } else preview = normalized.title || '[文件]' chatStore.updateContactPreview(cid, { lastMessage: preview, time: normalized.createdAt }) syncInboxFromServer() } } catch (e) { console.warn('[WS] parse message error', e, res && res.data) } }) uni.onSocketError((err) => { console.warn('[WS] error', err) }) uni.onSocketClose(() => { clearPongVerifyTimer() clearHeartbeatPongTimer() awaitingHandshakePong = false awaitingHeartbeatPong = false if (heartbeatTimer) { clearInterval(heartbeatTimer) heartbeatTimer = null } socketTask = null if (!intentionalClose && getToken()) { scheduleReconnect() } }) } function tryConnect() { const token = getToken() if (!token || socketTask) return intentionalClose = false attachSocketListenersOnce() const url = `${WS_BASE}?token=${encodeURIComponent(token)}` socketTask = uni.connectSocket({ url, success: () => {} }) } function performDisconnect() { intentionalClose = true clearReconnectTimer() clearPongVerifyTimer() clearHeartbeatPongTimer() awaitingHandshakePong = false awaitingHeartbeatPong = false reconnectAttempt = 0 if (heartbeatTimer) { clearInterval(heartbeatTimer) heartbeatTimer = null } if (socketTask) { uni.closeSocket() socketTask = null } } /** * 登出 / 清空 token 时关闭连接并禁止自动重连(供 setToken 等统一调用) */ export function disconnectWebSocket() { performDisconnect() } /** 启动已登录 / 登录成功后显式建连(与 useWebSocket().connect 相同) */ export function connectWebSocket() { tryConnect() } export function useWebSocket() { function connect() { tryConnect() } function disconnect() { performDisconnect() } return { connect, disconnect } }