| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439 |
- /**
- * 消息 WebSocket:与后端 /api/v1/ws/messages 一致——文本 ping/pong、业务仅 { type:"NEW_MESSAGE", data }
- * 收到其它 JSON(如接入层 control)直接忽略,不当作业务消息
- * 未读角标由 GET /messages/unread-count 刷新;断线重连指数退避;主动 disconnect 不重连
- * 重连成功且停留在某会话时补拉该会话历史(HTTP 兜底)
- * 建连/重连后立即发 ping,规定时间内未收到 pong 则关连接并走自动重连(验证链路可用)
- * 定时心跳 ping 后同样须在超时内收到 pong(或收到业务 NEW_MESSAGE 视为存活),否则关连接重连
- */
- import { getToken, markHistoryReadAll, normalizeMessageContentType } from '../utils/api'
- import { chatStore } from '../store/chat'
- import { fetchContactsList } from './useContacts'
- import { fetchUnreadCountAndUpdateTabBar } from './useUnreadBadge'
- import { fetchMessagesForContact } from './useMessages'
- const WS_BASE = 'wss://api.hnyunzhu.com/api/v1/ws/messages'
- const HEARTBEAT_INTERVAL = 30000
- /** 建连后首包 ping 须在此时长内收到 pong,否则视为假连接并关闭重连 */
- const VERIFY_PONG_TIMEOUT_MS = 8000
- const INITIAL_RECONNECT_DELAY = 1000
- const MAX_RECONNECT_DELAY = 30000
- let socketTask = null
- let heartbeatTimer = null
- let pongVerifyTimer = null
- /** 正在等待建连握手 pong,收到 pong 或超时后清零 */
- let awaitingHandshakePong = false
- let heartbeatPongTimer = null
- /** 定时心跳已发 ping,等待 pong(或业务帧视为存活) */
- let awaitingHeartbeatPong = false
- /** 仅 uni.onSocket* 注册一次,避免每次 connect 叠加监听 */
- let socketListenersAttached = false
- /** 网络恢复监听只注册一次 */
- let networkResumeAttached = false
- let intentionalClose = false
- let reconnectTimer = null
- let reconnectAttempt = 0
- function clearReconnectTimer() {
- if (reconnectTimer) {
- clearTimeout(reconnectTimer)
- reconnectTimer = null
- }
- }
- function clearPongVerifyTimer() {
- if (pongVerifyTimer) {
- clearTimeout(pongVerifyTimer)
- pongVerifyTimer = null
- }
- }
- function clearHeartbeatPongTimer() {
- if (heartbeatPongTimer) {
- clearTimeout(heartbeatPongTimer)
- heartbeatPongTimer = null
- }
- }
- /** 收到业务推送也视为链路存活,避免仅因 pong 与消息顺序问题误杀连接 */
- function markHeartbeatAlive() {
- clearHeartbeatPongTimer()
- awaitingHeartbeatPong = false
- }
- /** 收到 pong:结束握手/心跳等待;并拉未读/会话(与原先一致) */
- function handlePong() {
- if (awaitingHandshakePong) {
- console.log('[WS] handshake ok (pong)')
- }
- clearPongVerifyTimer()
- awaitingHandshakePong = false
- markHeartbeatAlive()
- syncInboxFromServer()
- }
- /** 定时心跳发出 ping 后启动超时,无 pong 则关连接重连 */
- function startHeartbeatPongWatch() {
- clearHeartbeatPongTimer()
- awaitingHeartbeatPong = true
- heartbeatPongTimer = setTimeout(() => {
- if (!awaitingHeartbeatPong || intentionalClose) return
- console.warn('[WS] heartbeat: no pong in time, closing to reconnect')
- awaitingHeartbeatPong = false
- heartbeatPongTimer = null
- try {
- uni.closeSocket()
- } catch (e) {}
- }, HEARTBEAT_PONG_TIMEOUT_MS)
- }
- /**
- * 建连成功后立即 ping,超时无 pong 则 closeSocket → onSocketClose → 自动重连
- */
- function startHandshakeVerify() {
- clearPongVerifyTimer()
- awaitingHandshakePong = true
- try {
- if (socketTask) uni.sendSocketMessage({ data: 'ping' })
- } catch (e) {
- awaitingHandshakePong = false
- return
- }
- pongVerifyTimer = setTimeout(() => {
- if (!awaitingHandshakePong || intentionalClose) return
- console.warn('[WS] verify: no pong in time, closing to reconnect')
- awaitingHandshakePong = false
- pongVerifyTimer = null
- try {
- uni.closeSocket()
- } catch (e) {}
- }, VERIFY_PONG_TIMEOUT_MS)
- }
- function attachNetworkResumeOnce() {
- if (networkResumeAttached) return
- networkResumeAttached = true
- uni.onNetworkStatusChange((res) => {
- if (!res.isConnected || !getToken() || intentionalClose) return
- if (!socketTask) tryConnect()
- })
- }
- /** WS 心跳成功或收到推送后:拉总未读 + 会话列表 */
- function syncInboxFromServer() {
- if (!getToken()) return
- Promise.all([fetchUnreadCountAndUpdateTabBar(), fetchContactsList()]).catch(() => {})
- }
- /** 建连/重连成功:若当前停留在某会话页,补拉最新一页历史;约 1.5s 后再拉一次兜底 */
- function refreshActiveThreadAfterSocketOpen() {
- const cid = chatStore.activeContactId
- if (!cid) return
- fetchMessagesForContact(cid).catch(() => {})
- setTimeout(() => {
- if (String(chatStore.activeContactId || '') !== String(cid)) return
- fetchMessagesForContact(cid).catch(() => {})
- }, 1500)
- }
- function scheduleReconnect() {
- if (intentionalClose) return
- if (!getToken()) return
- clearReconnectTimer()
- const delay = Math.min(
- INITIAL_RECONNECT_DELAY * Math.pow(2, reconnectAttempt),
- MAX_RECONNECT_DELAY
- )
- reconnectAttempt += 1
- console.log(`[WS] reconnect scheduled in ${delay}ms (attempt ${reconnectAttempt})`)
- reconnectTimer = setTimeout(() => {
- reconnectTimer = null
- tryConnect()
- }, delay)
- }
- function normalizeWsMessage(raw) {
- const m = raw.message || raw
- const type = m.type ?? 'MESSAGE'
- // 通知类消息:应用发给用户,当前用户始终为接收方
- const isMe = type === 'NOTIFICATION' ? false : (m.is_me ?? m.isMe)
- const rawText = m.content ?? m.text ?? ''
- const urlField = m.url ?? m.file_url ?? m.content_url
- const content =
- urlField && /^https?:\/\//i.test(String(urlField)) ? String(urlField) : String(rawText)
- return {
- id: String(m.id),
- type,
- senderId: m.sender_id ?? m.senderId,
- receiverId: m.receiver_id ?? m.receiverId,
- content,
- contentType: normalizeMessageContentType(m.content_type ?? m.contentType ?? 'TEXT'),
- title: m.title,
- createdAt: m.created_at ?? m.createdAt,
- isMe,
- actionUrl: m.action_url ?? m.actionUrl,
- actionText: m.action_text ?? m.actionText
- }
- }
- /**
- * 根据推送消息计算会话 ID:私信用 sender_id/receiver_id,通知用负的 app_id
- */
- function getContactIdFromMessage(msg, currentUserId) {
- const type = msg.type ?? 'MESSAGE'
- const appId = msg.app_id ?? msg.appId
- if (type === 'NOTIFICATION' && appId != null && appId !== '') {
- return -Math.abs(Number(appId))
- }
- const senderId = msg.sender_id ?? msg.senderId
- const receiverId = msg.receiver_id ?? msg.receiverId
- // 当前用户是接收方则会话对方是 sender_id,否则是 receiver_id
- if (String(receiverId) === String(currentUserId)) return senderId
- return receiverId
- }
- /** 将 uni 回调里的 data 转为文本(兼容部分端返回 ArrayBuffer) */
- function socketDataToText(res) {
- const d = res && res.data
- if (typeof d === 'string') return d
- if (typeof ArrayBuffer !== 'undefined' && d instanceof ArrayBuffer) {
- try {
- return new TextDecoder('utf-8').decode(d)
- } catch (e) {
- return ''
- }
- }
- return ''
- }
- /**
- * 解析一帧:pong 心跳 | JSON 对象(与后端 send_json 一致)
- * @returns {{ kind: 'pong' } | { kind: 'json', data: object } | null}
- */
- function parseSocketFrame(res) {
- const d = res && res.data
- if (d === 'pong' || (typeof d === 'string' && d.trim() === 'pong')) {
- return { kind: 'pong' }
- }
- if (typeof d === 'string') {
- const t = d.trim()
- if (t === 'pong') return { kind: 'pong' }
- if (!t) return null
- return { kind: 'json', data: JSON.parse(t) }
- }
- if (typeof ArrayBuffer !== 'undefined' && d instanceof ArrayBuffer) {
- const raw = new TextDecoder('utf-8').decode(d)
- if (raw.trim() === 'pong') return { kind: 'pong' }
- if (!raw.trim()) return null
- return { kind: 'json', data: JSON.parse(raw) }
- }
- if (d && typeof d === 'object') {
- return { kind: 'json', data: d }
- }
- return null
- }
- function attachSocketListenersOnce() {
- if (socketListenersAttached) return
- socketListenersAttached = true
- attachNetworkResumeOnce()
- uni.onSocketOpen(() => {
- reconnectAttempt = 0
- clearReconnectTimer()
- clearPongVerifyTimer()
- clearHeartbeatPongTimer()
- awaitingHandshakePong = false
- awaitingHeartbeatPong = false
- console.log('[WS] messages connected')
- syncInboxFromServer()
- refreshActiveThreadAfterSocketOpen()
- heartbeatTimer = setInterval(() => {
- try {
- if (socketTask) {
- uni.sendSocketMessage({ data: 'ping' })
- startHeartbeatPongWatch()
- }
- } catch (e) {}
- }, HEARTBEAT_INTERVAL)
- startHandshakeVerify()
- })
- uni.onSocketMessage((res) => {
- try {
- const parsed = parseSocketFrame(res)
- if (!parsed) {
- const t = socketDataToText(res)
- if (t) console.warn('[WS] unsupported frame', t.slice(0, 120))
- return
- }
- if (parsed.kind === 'pong') {
- handlePong()
- return
- }
- const data = parsed.data
- if (!data || typeof data !== 'object' || Array.isArray(data)) return
- if (data.type !== 'NEW_MESSAGE' || data.data == null) {
- if (typeof data.command !== 'undefined') {
- console.log('[WS] recv (skip non-business)', `command=${data.command}`)
- }
- return
- }
- markHeartbeatAlive()
- const msg = data.data
- const normalized = normalizeWsMessage({ message: msg })
- const currentUserId = data.current_user_id ?? normalized.receiverId
- const contactId = getContactIdFromMessage(msg, currentUserId)
- if (contactId == null) {
- console.warn('[WS] NEW_MESSAGE (ignored: no contactId)', { msg, currentUserId })
- return
- }
- {
- const cid = String(contactId)
- const list = chatStore.messages[cid] || []
- const hasId = normalized.id && list.some((m) => String(m.id) === String(normalized.id))
- if (hasId) return
- chatStore.appendMessage(cid, normalized)
- // 前台 & 后台消息通知:若当前不在该会话,则给出提示
- const isActive = String(chatStore.activeContactId || '') === String(contactId)
- if (!isActive) {
- const contact = (chatStore.contacts || []).find(
- (c) => String(c.user_id || c.id) === String(contactId)
- )
- const title = (contact && contact.title) || '新消息'
- let body = ''
- if (normalized.contentType === 'TEXT') {
- body = normalized.content ? String(normalized.content).slice(0, 50) : ''
- } else if (normalized.contentType === 'IMAGE') {
- body = '[图片]'
- } else if (normalized.contentType === 'VIDEO') {
- body = '[视频]'
- } else if (normalized.contentType === 'USER_NOTIFICATION') {
- body = normalized.title
- ? String(normalized.title).slice(0, 50)
- : normalized.content
- ? String(normalized.content).slice(0, 50)
- : '[通知]'
- } else {
- body = normalized.title || '[文件]'
- }
- // #ifdef APP-PLUS
- try {
- plus.push.createMessage(body || '您有一条新消息', { contactId }, { title })
- } catch (e) {
- // 兜底为 Toast
- uni.showToast({ title: `${title}: ${body || '新消息'}`, icon: 'none' })
- }
- // #endif
- // #ifndef APP-PLUS
- uni.showToast({ title: `${title}: ${body || '新消息'}`, icon: 'none' })
- // #endif
- } else {
- // 正在该会话对话框内收到新消息:标记本会话全部已读,并刷新未读与列表
- const t = getToken()
- if (t) {
- Promise.resolve(markHistoryReadAll(t, cid))
- .then(() =>
- Promise.all([fetchUnreadCountAndUpdateTabBar(), fetchContactsList()])
- )
- .catch(() => {})
- }
- }
- // 只更新该会话在列表中的最后一条预览与时间,不整表刷新
- let preview = ''
- if (normalized.contentType === 'TEXT') {
- preview = normalized.content ? String(normalized.content).slice(0, 50) : ''
- } else if (normalized.contentType === 'IMAGE') preview = '[图片]'
- else if (normalized.contentType === 'VIDEO') preview = '[视频]'
- else if (normalized.contentType === 'USER_NOTIFICATION') {
- preview = normalized.title
- ? String(normalized.title).slice(0, 50)
- : normalized.content
- ? String(normalized.content).slice(0, 50)
- : '[通知]'
- } else preview = normalized.title || '[文件]'
- chatStore.updateContactPreview(cid, { lastMessage: preview, time: normalized.createdAt })
- syncInboxFromServer()
- }
- } catch (e) {
- console.warn('[WS] parse message error', e, res && res.data)
- }
- })
- uni.onSocketError((err) => {
- console.warn('[WS] error', err)
- })
- uni.onSocketClose(() => {
- clearPongVerifyTimer()
- clearHeartbeatPongTimer()
- awaitingHandshakePong = false
- awaitingHeartbeatPong = false
- if (heartbeatTimer) {
- clearInterval(heartbeatTimer)
- heartbeatTimer = null
- }
- socketTask = null
- if (!intentionalClose && getToken()) {
- scheduleReconnect()
- }
- })
- }
- function tryConnect() {
- const token = getToken()
- if (!token || socketTask) return
- intentionalClose = false
- attachSocketListenersOnce()
- const url = `${WS_BASE}?token=${encodeURIComponent(token)}`
- socketTask = uni.connectSocket({
- url,
- success: () => {}
- })
- }
- function performDisconnect() {
- intentionalClose = true
- clearReconnectTimer()
- clearPongVerifyTimer()
- clearHeartbeatPongTimer()
- awaitingHandshakePong = false
- awaitingHeartbeatPong = false
- reconnectAttempt = 0
- if (heartbeatTimer) {
- clearInterval(heartbeatTimer)
- heartbeatTimer = null
- }
- if (socketTask) {
- uni.closeSocket()
- socketTask = null
- }
- }
- /**
- * 登出 / 清空 token 时关闭连接并禁止自动重连(供 setToken 等统一调用)
- */
- export function disconnectWebSocket() {
- performDisconnect()
- }
- /** 启动已登录 / 登录成功后显式建连(与 useWebSocket().connect 相同) */
- export function connectWebSocket() {
- tryConnect()
- }
- export function useWebSocket() {
- function connect() {
- tryConnect()
- }
- function disconnect() {
- performDisconnect()
- }
- return { connect, disconnect }
- }
|