useWebSocket.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. /**
  2. * 消息 WebSocket:与后端 /api/v1/ws/messages 一致——文本 ping/pong、业务仅 { type:"NEW_MESSAGE", data }
  3. * 收到其它 JSON(如接入层 control)直接忽略,不当作业务消息
  4. * 未读角标由 GET /messages/unread-count 刷新;断线重连指数退避;主动 disconnect 不重连
  5. * 重连成功且停留在某会话时补拉该会话历史(HTTP 兜底)
  6. * 建连/重连后立即发 ping,规定时间内未收到 pong 则关连接并走自动重连(验证链路可用)
  7. * 定时心跳 ping 后同样须在超时内收到 pong(或收到业务 NEW_MESSAGE 视为存活),否则关连接重连
  8. */
  9. import { getToken, markHistoryReadAll, normalizeMessageContentType } from '../utils/api'
  10. import { chatStore } from '../store/chat'
  11. import { fetchContactsList } from './useContacts'
  12. import { fetchUnreadCountAndUpdateTabBar } from './useUnreadBadge'
  13. import { fetchMessagesForContact } from './useMessages'
  14. const WS_BASE = 'wss://api.hnyunzhu.com/api/v1/ws/messages'
  15. const HEARTBEAT_INTERVAL = 30000
  16. /** 建连后首包 ping 须在此时长内收到 pong,否则视为假连接并关闭重连 */
  17. const VERIFY_PONG_TIMEOUT_MS = 8000
  18. const INITIAL_RECONNECT_DELAY = 1000
  19. const MAX_RECONNECT_DELAY = 30000
  20. let socketTask = null
  21. let heartbeatTimer = null
  22. let pongVerifyTimer = null
  23. /** 正在等待建连握手 pong,收到 pong 或超时后清零 */
  24. let awaitingHandshakePong = false
  25. let heartbeatPongTimer = null
  26. /** 定时心跳已发 ping,等待 pong(或业务帧视为存活) */
  27. let awaitingHeartbeatPong = false
  28. /** 仅 uni.onSocket* 注册一次,避免每次 connect 叠加监听 */
  29. let socketListenersAttached = false
  30. /** 网络恢复监听只注册一次 */
  31. let networkResumeAttached = false
  32. let intentionalClose = false
  33. let reconnectTimer = null
  34. let reconnectAttempt = 0
  35. function clearReconnectTimer() {
  36. if (reconnectTimer) {
  37. clearTimeout(reconnectTimer)
  38. reconnectTimer = null
  39. }
  40. }
  41. function clearPongVerifyTimer() {
  42. if (pongVerifyTimer) {
  43. clearTimeout(pongVerifyTimer)
  44. pongVerifyTimer = null
  45. }
  46. }
  47. function clearHeartbeatPongTimer() {
  48. if (heartbeatPongTimer) {
  49. clearTimeout(heartbeatPongTimer)
  50. heartbeatPongTimer = null
  51. }
  52. }
  53. /** 收到业务推送也视为链路存活,避免仅因 pong 与消息顺序问题误杀连接 */
  54. function markHeartbeatAlive() {
  55. clearHeartbeatPongTimer()
  56. awaitingHeartbeatPong = false
  57. }
  58. /** 收到 pong:结束握手/心跳等待;并拉未读/会话(与原先一致) */
  59. function handlePong() {
  60. if (awaitingHandshakePong) {
  61. console.log('[WS] handshake ok (pong)')
  62. }
  63. clearPongVerifyTimer()
  64. awaitingHandshakePong = false
  65. markHeartbeatAlive()
  66. syncInboxFromServer()
  67. }
  68. /** 定时心跳发出 ping 后启动超时,无 pong 则关连接重连 */
  69. function startHeartbeatPongWatch() {
  70. clearHeartbeatPongTimer()
  71. awaitingHeartbeatPong = true
  72. heartbeatPongTimer = setTimeout(() => {
  73. if (!awaitingHeartbeatPong || intentionalClose) return
  74. console.warn('[WS] heartbeat: no pong in time, closing to reconnect')
  75. awaitingHeartbeatPong = false
  76. heartbeatPongTimer = null
  77. try {
  78. uni.closeSocket()
  79. } catch (e) {}
  80. }, HEARTBEAT_PONG_TIMEOUT_MS)
  81. }
  82. /**
  83. * 建连成功后立即 ping,超时无 pong 则 closeSocket → onSocketClose → 自动重连
  84. */
  85. function startHandshakeVerify() {
  86. clearPongVerifyTimer()
  87. awaitingHandshakePong = true
  88. try {
  89. if (socketTask) uni.sendSocketMessage({ data: 'ping' })
  90. } catch (e) {
  91. awaitingHandshakePong = false
  92. return
  93. }
  94. pongVerifyTimer = setTimeout(() => {
  95. if (!awaitingHandshakePong || intentionalClose) return
  96. console.warn('[WS] verify: no pong in time, closing to reconnect')
  97. awaitingHandshakePong = false
  98. pongVerifyTimer = null
  99. try {
  100. uni.closeSocket()
  101. } catch (e) {}
  102. }, VERIFY_PONG_TIMEOUT_MS)
  103. }
  104. function attachNetworkResumeOnce() {
  105. if (networkResumeAttached) return
  106. networkResumeAttached = true
  107. uni.onNetworkStatusChange((res) => {
  108. if (!res.isConnected || !getToken() || intentionalClose) return
  109. if (!socketTask) tryConnect()
  110. })
  111. }
  112. /** WS 心跳成功或收到推送后:拉总未读 + 会话列表 */
  113. function syncInboxFromServer() {
  114. if (!getToken()) return
  115. Promise.all([fetchUnreadCountAndUpdateTabBar(), fetchContactsList()]).catch(() => {})
  116. }
  117. /** 建连/重连成功:若当前停留在某会话页,补拉最新一页历史;约 1.5s 后再拉一次兜底 */
  118. function refreshActiveThreadAfterSocketOpen() {
  119. const cid = chatStore.activeContactId
  120. if (!cid) return
  121. fetchMessagesForContact(cid).catch(() => {})
  122. setTimeout(() => {
  123. if (String(chatStore.activeContactId || '') !== String(cid)) return
  124. fetchMessagesForContact(cid).catch(() => {})
  125. }, 1500)
  126. }
  127. function scheduleReconnect() {
  128. if (intentionalClose) return
  129. if (!getToken()) return
  130. clearReconnectTimer()
  131. const delay = Math.min(
  132. INITIAL_RECONNECT_DELAY * Math.pow(2, reconnectAttempt),
  133. MAX_RECONNECT_DELAY
  134. )
  135. reconnectAttempt += 1
  136. console.log(`[WS] reconnect scheduled in ${delay}ms (attempt ${reconnectAttempt})`)
  137. reconnectTimer = setTimeout(() => {
  138. reconnectTimer = null
  139. tryConnect()
  140. }, delay)
  141. }
  142. function normalizeWsMessage(raw) {
  143. const m = raw.message || raw
  144. const type = m.type ?? 'MESSAGE'
  145. // 通知类消息:应用发给用户,当前用户始终为接收方
  146. const isMe = type === 'NOTIFICATION' ? false : (m.is_me ?? m.isMe)
  147. const rawText = m.content ?? m.text ?? ''
  148. const urlField = m.url ?? m.file_url ?? m.content_url
  149. const content =
  150. urlField && /^https?:\/\//i.test(String(urlField)) ? String(urlField) : String(rawText)
  151. return {
  152. id: String(m.id),
  153. type,
  154. senderId: m.sender_id ?? m.senderId,
  155. receiverId: m.receiver_id ?? m.receiverId,
  156. content,
  157. contentType: normalizeMessageContentType(m.content_type ?? m.contentType ?? 'TEXT'),
  158. title: m.title,
  159. createdAt: m.created_at ?? m.createdAt,
  160. isMe,
  161. actionUrl: m.action_url ?? m.actionUrl,
  162. actionText: m.action_text ?? m.actionText
  163. }
  164. }
  165. /**
  166. * 根据推送消息计算会话 ID:私信用 sender_id/receiver_id,通知用负的 app_id
  167. */
  168. function getContactIdFromMessage(msg, currentUserId) {
  169. const type = msg.type ?? 'MESSAGE'
  170. const appId = msg.app_id ?? msg.appId
  171. if (type === 'NOTIFICATION' && appId != null && appId !== '') {
  172. return -Math.abs(Number(appId))
  173. }
  174. const senderId = msg.sender_id ?? msg.senderId
  175. const receiverId = msg.receiver_id ?? msg.receiverId
  176. // 当前用户是接收方则会话对方是 sender_id,否则是 receiver_id
  177. if (String(receiverId) === String(currentUserId)) return senderId
  178. return receiverId
  179. }
  180. /** 将 uni 回调里的 data 转为文本(兼容部分端返回 ArrayBuffer) */
  181. function socketDataToText(res) {
  182. const d = res && res.data
  183. if (typeof d === 'string') return d
  184. if (typeof ArrayBuffer !== 'undefined' && d instanceof ArrayBuffer) {
  185. try {
  186. return new TextDecoder('utf-8').decode(d)
  187. } catch (e) {
  188. return ''
  189. }
  190. }
  191. return ''
  192. }
  193. /**
  194. * 解析一帧:pong 心跳 | JSON 对象(与后端 send_json 一致)
  195. * @returns {{ kind: 'pong' } | { kind: 'json', data: object } | null}
  196. */
  197. function parseSocketFrame(res) {
  198. const d = res && res.data
  199. if (d === 'pong' || (typeof d === 'string' && d.trim() === 'pong')) {
  200. return { kind: 'pong' }
  201. }
  202. if (typeof d === 'string') {
  203. const t = d.trim()
  204. if (t === 'pong') return { kind: 'pong' }
  205. if (!t) return null
  206. return { kind: 'json', data: JSON.parse(t) }
  207. }
  208. if (typeof ArrayBuffer !== 'undefined' && d instanceof ArrayBuffer) {
  209. const raw = new TextDecoder('utf-8').decode(d)
  210. if (raw.trim() === 'pong') return { kind: 'pong' }
  211. if (!raw.trim()) return null
  212. return { kind: 'json', data: JSON.parse(raw) }
  213. }
  214. if (d && typeof d === 'object') {
  215. return { kind: 'json', data: d }
  216. }
  217. return null
  218. }
  219. function attachSocketListenersOnce() {
  220. if (socketListenersAttached) return
  221. socketListenersAttached = true
  222. attachNetworkResumeOnce()
  223. uni.onSocketOpen(() => {
  224. reconnectAttempt = 0
  225. clearReconnectTimer()
  226. clearPongVerifyTimer()
  227. clearHeartbeatPongTimer()
  228. awaitingHandshakePong = false
  229. awaitingHeartbeatPong = false
  230. console.log('[WS] messages connected')
  231. syncInboxFromServer()
  232. refreshActiveThreadAfterSocketOpen()
  233. heartbeatTimer = setInterval(() => {
  234. try {
  235. if (socketTask) {
  236. uni.sendSocketMessage({ data: 'ping' })
  237. startHeartbeatPongWatch()
  238. }
  239. } catch (e) {}
  240. }, HEARTBEAT_INTERVAL)
  241. startHandshakeVerify()
  242. })
  243. uni.onSocketMessage((res) => {
  244. try {
  245. const parsed = parseSocketFrame(res)
  246. if (!parsed) {
  247. const t = socketDataToText(res)
  248. if (t) console.warn('[WS] unsupported frame', t.slice(0, 120))
  249. return
  250. }
  251. if (parsed.kind === 'pong') {
  252. handlePong()
  253. return
  254. }
  255. const data = parsed.data
  256. if (!data || typeof data !== 'object' || Array.isArray(data)) return
  257. if (data.type !== 'NEW_MESSAGE' || data.data == null) {
  258. if (typeof data.command !== 'undefined') {
  259. console.log('[WS] recv (skip non-business)', `command=${data.command}`)
  260. }
  261. return
  262. }
  263. markHeartbeatAlive()
  264. const msg = data.data
  265. const normalized = normalizeWsMessage({ message: msg })
  266. const currentUserId = data.current_user_id ?? normalized.receiverId
  267. const contactId = getContactIdFromMessage(msg, currentUserId)
  268. if (contactId == null) {
  269. console.warn('[WS] NEW_MESSAGE (ignored: no contactId)', { msg, currentUserId })
  270. return
  271. }
  272. {
  273. const cid = String(contactId)
  274. const list = chatStore.messages[cid] || []
  275. const hasId = normalized.id && list.some((m) => String(m.id) === String(normalized.id))
  276. if (hasId) return
  277. chatStore.appendMessage(cid, normalized)
  278. // 前台 & 后台消息通知:若当前不在该会话,则给出提示
  279. const isActive = String(chatStore.activeContactId || '') === String(contactId)
  280. if (!isActive) {
  281. const contact = (chatStore.contacts || []).find(
  282. (c) => String(c.user_id || c.id) === String(contactId)
  283. )
  284. const title = (contact && contact.title) || '新消息'
  285. let body = ''
  286. if (normalized.contentType === 'TEXT') {
  287. body = normalized.content ? String(normalized.content).slice(0, 50) : ''
  288. } else if (normalized.contentType === 'IMAGE') {
  289. body = '[图片]'
  290. } else if (normalized.contentType === 'VIDEO') {
  291. body = '[视频]'
  292. } else if (normalized.contentType === 'USER_NOTIFICATION') {
  293. body = normalized.title
  294. ? String(normalized.title).slice(0, 50)
  295. : normalized.content
  296. ? String(normalized.content).slice(0, 50)
  297. : '[通知]'
  298. } else {
  299. body = normalized.title || '[文件]'
  300. }
  301. // #ifdef APP-PLUS
  302. try {
  303. plus.push.createMessage(body || '您有一条新消息', { contactId }, { title })
  304. } catch (e) {
  305. // 兜底为 Toast
  306. uni.showToast({ title: `${title}: ${body || '新消息'}`, icon: 'none' })
  307. }
  308. // #endif
  309. // #ifndef APP-PLUS
  310. uni.showToast({ title: `${title}: ${body || '新消息'}`, icon: 'none' })
  311. // #endif
  312. } else {
  313. // 正在该会话对话框内收到新消息:标记本会话全部已读,并刷新未读与列表
  314. const t = getToken()
  315. if (t) {
  316. Promise.resolve(markHistoryReadAll(t, cid))
  317. .then(() =>
  318. Promise.all([fetchUnreadCountAndUpdateTabBar(), fetchContactsList()])
  319. )
  320. .catch(() => {})
  321. }
  322. }
  323. // 只更新该会话在列表中的最后一条预览与时间,不整表刷新
  324. let preview = ''
  325. if (normalized.contentType === 'TEXT') {
  326. preview = normalized.content ? String(normalized.content).slice(0, 50) : ''
  327. } else if (normalized.contentType === 'IMAGE') preview = '[图片]'
  328. else if (normalized.contentType === 'VIDEO') preview = '[视频]'
  329. else if (normalized.contentType === 'USER_NOTIFICATION') {
  330. preview = normalized.title
  331. ? String(normalized.title).slice(0, 50)
  332. : normalized.content
  333. ? String(normalized.content).slice(0, 50)
  334. : '[通知]'
  335. } else preview = normalized.title || '[文件]'
  336. chatStore.updateContactPreview(cid, { lastMessage: preview, time: normalized.createdAt })
  337. syncInboxFromServer()
  338. }
  339. } catch (e) {
  340. console.warn('[WS] parse message error', e, res && res.data)
  341. }
  342. })
  343. uni.onSocketError((err) => {
  344. console.warn('[WS] error', err)
  345. })
  346. uni.onSocketClose(() => {
  347. clearPongVerifyTimer()
  348. clearHeartbeatPongTimer()
  349. awaitingHandshakePong = false
  350. awaitingHeartbeatPong = false
  351. if (heartbeatTimer) {
  352. clearInterval(heartbeatTimer)
  353. heartbeatTimer = null
  354. }
  355. socketTask = null
  356. if (!intentionalClose && getToken()) {
  357. scheduleReconnect()
  358. }
  359. })
  360. }
  361. function tryConnect() {
  362. const token = getToken()
  363. if (!token || socketTask) return
  364. intentionalClose = false
  365. attachSocketListenersOnce()
  366. const url = `${WS_BASE}?token=${encodeURIComponent(token)}`
  367. socketTask = uni.connectSocket({
  368. url,
  369. success: () => {}
  370. })
  371. }
  372. function performDisconnect() {
  373. intentionalClose = true
  374. clearReconnectTimer()
  375. clearPongVerifyTimer()
  376. clearHeartbeatPongTimer()
  377. awaitingHandshakePong = false
  378. awaitingHeartbeatPong = false
  379. reconnectAttempt = 0
  380. if (heartbeatTimer) {
  381. clearInterval(heartbeatTimer)
  382. heartbeatTimer = null
  383. }
  384. if (socketTask) {
  385. uni.closeSocket()
  386. socketTask = null
  387. }
  388. }
  389. /**
  390. * 登出 / 清空 token 时关闭连接并禁止自动重连(供 setToken 等统一调用)
  391. */
  392. export function disconnectWebSocket() {
  393. performDisconnect()
  394. }
  395. /** 启动已登录 / 登录成功后显式建连(与 useWebSocket().connect 相同) */
  396. export function connectWebSocket() {
  397. tryConnect()
  398. }
  399. export function useWebSocket() {
  400. function connect() {
  401. tryConnect()
  402. }
  403. function disconnect() {
  404. performDisconnect()
  405. }
  406. return { connect, disconnect }
  407. }