messages.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574
  1. from typing import Any, List, Optional
  2. from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
  3. from sqlalchemy.orm import Session, joinedload
  4. from sqlalchemy import or_, and_, desc
  5. from app.core.database import get_db
  6. from app.api.v1 import deps
  7. from app.api.v1.deps import AuthSubject, get_current_user_or_app
  8. from app.models.message import Message, MessageType
  9. from app.models.user import User
  10. from app.models.application import Application
  11. from app.models.mapping import AppUserMapping
  12. from app.schemas.message import MessageCreate, MessageResponse, MessageUpdate, ContentType, ConversationResponse
  13. from app.core.websocket_manager import manager
  14. from app.core.config import settings
  15. from app.core.minio import minio_storage
  16. from app.services.ticket_service import TicketService
  17. from datetime import datetime
  18. from urllib.parse import quote, urlparse, parse_qs, urlencode, urlunparse
  19. import json
  20. router = APIRouter()
  21. def _process_message_content(message: Message) -> MessageResponse:
  22. """处理消息内容,为文件类型生成预签名 URL"""
  23. # Pydantic v2 use model_validate
  24. response = MessageResponse.model_validate(message)
  25. # 填充应用名称
  26. if message.app_id and message.app:
  27. response.app_name = message.app.app_name
  28. if message.content_type in [ContentType.IMAGE, ContentType.VIDEO, ContentType.FILE]:
  29. # 如果内容是对象 Key (不以 http 开头),则生成预签名 URL
  30. # 如果是旧数据的完整 URL,则保持不变 (或视需求处理)
  31. if message.content and not message.content.startswith("http"):
  32. signed_url = minio_storage.get_presigned_url(message.content)
  33. if signed_url:
  34. response.content = signed_url
  35. return response
  36. @router.post("/", response_model=MessageResponse)
  37. async def create_message(
  38. *,
  39. db: Session = Depends(get_db),
  40. message_in: MessageCreate,
  41. current_subject: AuthSubject = Depends(get_current_user_or_app),
  42. background_tasks: BackgroundTasks
  43. ) -> Any:
  44. """
  45. 发送消息 (支持用户私信和应用通知)
  46. 权限:
  47. - 用户:只能发送 MESSAGE
  48. - 应用:可以发送 NOTIFICATION 或 MESSAGE
  49. """
  50. sender_id = None
  51. app_id_int = None # 数据库中的 Integer ID(用于存储到 Message 表)
  52. app_id_str = None # 字符串类型的 app_id(用于 SSO 跳转链接)
  53. # 1. 鉴权与身份识别
  54. if isinstance(current_subject, User):
  55. # 用户发送
  56. if message_in.type == MessageType.NOTIFICATION:
  57. raise HTTPException(status_code=403, detail="普通用户无权发送系统通知")
  58. sender_id = current_subject.id
  59. # 如果用户发送时提供了 app_id(字符串),需要查找对应的应用
  60. if message_in.app_id:
  61. app = db.query(Application).filter(Application.app_id == message_in.app_id).first()
  62. if not app:
  63. raise HTTPException(status_code=404, detail=f"应用未找到: {message_in.app_id}")
  64. app_id_int = app.id
  65. app_id_str = app.app_id
  66. elif isinstance(current_subject, Application):
  67. # 应用发送
  68. app_id_int = current_subject.id
  69. app_id_str = current_subject.app_id
  70. # 安全校验: 如果传入了 app_id(字符串),确保与签名身份一致
  71. if message_in.app_id:
  72. if message_in.app_id != current_subject.app_id:
  73. raise HTTPException(
  74. status_code=403,
  75. detail=f"传入的 app_id ({message_in.app_id}) 与认证应用不匹配"
  76. )
  77. # 使用当前认证应用的 app_id(字符串)
  78. message_in.app_id = app_id_str
  79. # 2. 确定接收者 (Receiver Resolution)
  80. final_receiver_id = None
  81. is_broadcast = getattr(message_in, "is_broadcast", False)
  82. if is_broadcast:
  83. if message_in.type != MessageType.NOTIFICATION:
  84. raise HTTPException(status_code=403, detail="广播模式仅支持系统通知 (NOTIFICATION)")
  85. elif message_in.receiver_id:
  86. # 方式 A: 直接指定 User ID
  87. final_receiver_id = message_in.receiver_id
  88. user = db.query(User).filter(User.id == final_receiver_id).first()
  89. if not user:
  90. raise HTTPException(status_code=404, detail="接收用户未找到")
  91. elif message_in.app_user_id and message_in.app_id:
  92. # 方式 B: 通过 App User ID 查找映射
  93. # 注意:如果是用户发送,必须要提供 app_id 才能查映射
  94. # 如果是应用发送,message_in.app_id 已经是字符串类型
  95. # 如果 app_id_int 还没有设置(用户发送且提供了字符串 app_id),需要查找
  96. if app_id_int is None:
  97. app = db.query(Application).filter(Application.app_id == message_in.app_id).first()
  98. if not app:
  99. raise HTTPException(status_code=404, detail=f"应用未找到: {message_in.app_id}")
  100. app_id_int = app.id
  101. app_id_str = app.app_id
  102. mapping = db.query(AppUserMapping).filter(
  103. AppUserMapping.app_id == app_id_int, # 使用 Integer ID 查询映射表
  104. AppUserMapping.mapped_key == message_in.app_user_id
  105. ).first()
  106. if not mapping:
  107. raise HTTPException(
  108. status_code=404,
  109. detail=f"用户映射未找到: App {message_in.app_id}, User {message_in.app_user_id}"
  110. )
  111. final_receiver_id = mapping.user_id
  112. else:
  113. raise HTTPException(status_code=400, detail="必须指定 receiver_id 或 (app_id + app_user_id),或者设置 is_broadcast=True")
  114. # 3. 处理 SSO 跳转链接 (Link Generation)
  115. final_action_url = message_in.action_url
  116. if message_in.type == MessageType.NOTIFICATION and message_in.auto_sso and app_id_str and message_in.target_url:
  117. # 生成 jump 接口 URL,用户点击时调用后端接口生成 callback URL
  118. # 格式: {PLATFORM_URL}/api/v1/simple/sso/jump?app_id={APP_ID}&redirect_to={TARGET_URL}
  119. base_url = settings.API_V1_STR # /api/v1
  120. encoded_target = quote(message_in.target_url)
  121. final_action_url = f"{base_url}/simple/sso/jump?app_id={app_id_str}&redirect_to={encoded_target}"
  122. # 处理内容 (如果是文件类型且传入的是 URL,尝试提取 Key)
  123. content_val = message_in.content if isinstance(message_in.content, str) else str(message_in.content)
  124. if message_in.content_type in [ContentType.IMAGE, ContentType.VIDEO, ContentType.FILE]:
  125. # 简单判断: 如果包含 bucket name,可能是 URL
  126. if settings.MINIO_BUCKET_NAME in content_val and "http" in content_val:
  127. try:
  128. # 尝试从 URL 中提取 path
  129. parsed = urlparse(content_val)
  130. path = parsed.path.lstrip('/')
  131. # path 可能是 "bucket_name/object_key"
  132. if path.startswith(settings.MINIO_BUCKET_NAME + "/"):
  133. content_val = path[len(settings.MINIO_BUCKET_NAME)+1:]
  134. except:
  135. pass # 提取失败则保持原样
  136. # 4. 创建消息
  137. if is_broadcast:
  138. # 查找所有活跃用户 (is_deleted=0)
  139. active_users = db.query(User.id).filter(
  140. User.status == "ACTIVE",
  141. User.is_deleted == 0
  142. ).all()
  143. if not active_users:
  144. raise HTTPException(status_code=400, detail="没有可发送的活跃用户")
  145. messages_to_insert = []
  146. for u in active_users:
  147. messages_to_insert.append(
  148. Message(
  149. sender_id=sender_id,
  150. receiver_id=u.id,
  151. app_id=app_id_int,
  152. type=message_in.type,
  153. content_type=message_in.content_type,
  154. title=message_in.title,
  155. content=content_val,
  156. action_url=final_action_url,
  157. action_text=message_in.action_text
  158. )
  159. )
  160. db.add_all(messages_to_insert)
  161. db.commit()
  162. # 使用第一条消息进行签名作为接口的返回值
  163. db.refresh(messages_to_insert[0])
  164. processed_msg = _process_message_content(messages_to_insert[0])
  165. # 提取推送所需数据,避免在异步任务中触发延迟加载
  166. push_tasks = [{"id": msg.id, "receiver_id": msg.receiver_id} for msg in messages_to_insert]
  167. async def send_broadcast_ws():
  168. for t in push_tasks:
  169. push_payload = {
  170. "type": "NEW_MESSAGE",
  171. "data": {
  172. "id": t["id"],
  173. "type": processed_msg.type,
  174. "content_type": processed_msg.content_type,
  175. "title": processed_msg.title,
  176. "content": processed_msg.content,
  177. "action_url": processed_msg.action_url,
  178. "action_text": processed_msg.action_text,
  179. "sender_name": "系统通知" if not sender_id else "用户私信",
  180. "sender_id": sender_id,
  181. "created_at": str(processed_msg.created_at),
  182. "app_id": processed_msg.app_id,
  183. "app_name": processed_msg.app_name,
  184. }
  185. }
  186. await manager.send_personal_message(push_payload, t["receiver_id"])
  187. background_tasks.add_task(send_broadcast_ws)
  188. return processed_msg
  189. else:
  190. message = Message(
  191. sender_id=sender_id,
  192. receiver_id=final_receiver_id,
  193. app_id=app_id_int, # 使用 Integer ID 存储到数据库
  194. type=message_in.type,
  195. content_type=message_in.content_type,
  196. title=message_in.title,
  197. content=content_val,
  198. action_url=final_action_url,
  199. action_text=message_in.action_text
  200. )
  201. db.add(message)
  202. db.commit()
  203. db.refresh(message)
  204. # 5. 触发实时推送 (WebSocket)
  205. # 处理用于推送的消息内容 (签名)
  206. processed_msg = _process_message_content(message)
  207. push_payload = {
  208. "type": "NEW_MESSAGE",
  209. "data": {
  210. "id": processed_msg.id,
  211. "type": processed_msg.type,
  212. "content_type": processed_msg.content_type,
  213. "title": processed_msg.title,
  214. "content": processed_msg.content, # 使用签名后的 URL
  215. "action_url": processed_msg.action_url,
  216. "action_text": processed_msg.action_text,
  217. "sender_name": "系统通知" if not sender_id else "用户私信", # 简化处理
  218. "sender_id": sender_id, # Add sender_id for frontend to decide left/right
  219. "created_at": str(processed_msg.created_at),
  220. # 附加应用信息,便于前端按应用拆分系统通知会话
  221. "app_id": message.app_id,
  222. "app_name": message.app.app_name if message.app else None,
  223. }
  224. }
  225. # 使用后台任务发送 WS 消息,避免阻塞 HTTP 响应
  226. # 如果是发给自己,receiver_id == sender_id,ws 会收到一次
  227. background_tasks.add_task(manager.send_personal_message, push_payload, final_receiver_id)
  228. return processed_msg
  229. @router.get("/", response_model=List[MessageResponse])
  230. def read_messages(
  231. db: Session = Depends(get_db),
  232. skip: int = 0,
  233. limit: int = 100,
  234. unread_only: bool = False,
  235. current_user: User = Depends(deps.get_current_active_user),
  236. ) -> Any:
  237. """
  238. 获取当前用户的消息列表 (所有历史记录)
  239. """
  240. query = db.query(Message).options(joinedload(Message.app)).filter(Message.receiver_id == current_user.id)
  241. if unread_only:
  242. query = query.filter(Message.is_read == False)
  243. messages = query.order_by(Message.created_at.desc()).offset(skip).limit(limit).all()
  244. # 处理文件 URL 签名
  245. return [_process_message_content(msg) for msg in messages]
  246. @router.get("/conversations", response_model=List[ConversationResponse])
  247. def get_conversations(
  248. db: Session = Depends(get_db),
  249. current_user: User = Depends(deps.get_current_active_user)
  250. ) -> Any:
  251. """
  252. 获取当前用户的会话列表 (聚合)
  253. - 私信:按用户聚合
  254. - 系统通知:按应用(app)拆分成多个会话,类似多个“系统私信”
  255. """
  256. # 查找所有与我相关的消息,并预加载 app 信息,便于显示应用名
  257. messages = (
  258. db.query(Message)
  259. .options(joinedload(Message.app))
  260. .filter(
  261. or_(
  262. Message.sender_id == current_user.id,
  263. Message.receiver_id == current_user.id,
  264. )
  265. )
  266. .order_by(Message.created_at.desc())
  267. .limit(1000)
  268. .all()
  269. )
  270. conversations_map: dict[int, dict] = {}
  271. for msg in messages:
  272. # 确定对话方 (Counterpart)
  273. other_id = None
  274. other_user = None
  275. # 系统通知:按 app 拆分。约定:同一个 app 的系统会话使用负数 id,避免和真实用户冲突
  276. if msg.type == MessageType.NOTIFICATION:
  277. if msg.app_id:
  278. other_id = -int(msg.app_id)
  279. else:
  280. # 兼容历史数据:没有 app_id 的系统通知归为一个“系统通知”会话
  281. other_id = 0
  282. elif msg.sender_id == current_user.id and msg.receiver_id == current_user.id:
  283. # 文件传输助手
  284. other_id = current_user.id
  285. other_user = current_user
  286. elif msg.sender_id == current_user.id:
  287. other_id = msg.receiver_id
  288. other_user = msg.receiver
  289. else:
  290. other_id = msg.sender_id
  291. other_user = msg.sender
  292. # 如果是私信但没找到用户,跳过
  293. if other_id not in (0, None) and not other_user and msg.type != MessageType.NOTIFICATION:
  294. continue
  295. # 如果这个对话方还没处理过
  296. if other_id not in conversations_map:
  297. if msg.type == MessageType.NOTIFICATION:
  298. # 系统会话
  299. if msg.app_id and msg.app:
  300. username = msg.app.app_id or f"APP-{msg.app_id}"
  301. full_name = msg.app.app_name or username
  302. is_system = True
  303. app_id = msg.app_id
  304. app_name = msg.app.app_name
  305. else:
  306. # 老的统一系统通知
  307. username = "System"
  308. full_name = "系统通知"
  309. is_system = True
  310. app_id = None
  311. app_name = None
  312. else:
  313. # 普通用户会话
  314. username = other_user.mobile # User has mobile, not username
  315. full_name = other_user.name or other_user.english_name or other_user.mobile
  316. is_system = False
  317. app_id = None
  318. app_name = None
  319. conversations_map[other_id] = {
  320. "user_id": other_id,
  321. "username": username,
  322. "full_name": full_name,
  323. "unread_count": 0,
  324. "last_message": msg.content if msg.content_type == ContentType.TEXT else f"[{msg.content_type}]",
  325. "last_message_type": msg.content_type,
  326. "updated_at": msg.created_at,
  327. "is_system": is_system,
  328. "app_id": app_id,
  329. "app_name": app_name,
  330. }
  331. # 累加未读数 (只计算接收方是自己的未读消息)
  332. # 注意: 这里的 is_read 是针对接收者的状态
  333. # 即使是自己发送的消息,msg.receiver_id 也不会是自己(除非发给自己)
  334. # 所以这里的判断逻辑是: 如果我是接收者,且未读,则计入未读数
  335. if not msg.is_read and msg.receiver_id == current_user.id:
  336. conversations_map[other_id]["unread_count"] += 1
  337. return list(conversations_map.values())
  338. @router.get("/history/{other_user_id}", response_model=List[MessageResponse])
  339. def get_chat_history(
  340. other_user_id: int,
  341. skip: int = 0,
  342. limit: int = 50,
  343. db: Session = Depends(get_db),
  344. current_user: User = Depends(deps.get_current_active_user)
  345. ) -> Any:
  346. """
  347. 获取与特定用户/系统会话的聊天记录
  348. - other_user_id > 0: 普通用户私信
  349. - other_user_id == 0: 兼容历史的“所有系统通知”会话
  350. - other_user_id < 0: 按 app 拆分的系统通知会话(-app_id)
  351. """
  352. if other_user_id == 0:
  353. # 所有系统通知(兼容旧实现)
  354. query = db.query(Message).options(joinedload(Message.app)).filter(
  355. Message.receiver_id == current_user.id,
  356. Message.type == MessageType.NOTIFICATION,
  357. ).order_by(Message.created_at.desc())
  358. elif other_user_id < 0:
  359. # 单个应用的系统通知会话
  360. app_id = -other_user_id
  361. query = db.query(Message).options(joinedload(Message.app)).filter(
  362. Message.receiver_id == current_user.id,
  363. Message.type == MessageType.NOTIFICATION,
  364. Message.app_id == app_id,
  365. ).order_by(Message.created_at.desc())
  366. else:
  367. # 用户私信
  368. query = db.query(Message).options(joinedload(Message.app)).filter(
  369. or_(
  370. and_(Message.sender_id == current_user.id, Message.receiver_id == other_user_id),
  371. and_(Message.sender_id == other_user_id, Message.receiver_id == current_user.id),
  372. )
  373. ).order_by(Message.created_at.desc()) # 最新在前
  374. messages = query.offset(skip).limit(limit).all()
  375. return [_process_message_content(msg) for msg in messages]
  376. @router.get("/unread-count", response_model=int)
  377. def get_unread_count(
  378. db: Session = Depends(get_db),
  379. current_user: User = Depends(deps.get_current_active_user),
  380. ) -> Any:
  381. count = db.query(Message).filter(
  382. Message.receiver_id == current_user.id,
  383. Message.is_read == False
  384. ).count()
  385. return count
  386. @router.put("/{message_id}/read", response_model=MessageResponse)
  387. def mark_as_read(
  388. message_id: int,
  389. db: Session = Depends(get_db),
  390. current_user: User = Depends(deps.get_current_active_user),
  391. ) -> Any:
  392. message = db.query(Message).filter(
  393. Message.id == message_id,
  394. Message.receiver_id == current_user.id
  395. ).first()
  396. if not message:
  397. raise HTTPException(status_code=404, detail="Message not found")
  398. if not message.is_read:
  399. message.is_read = True
  400. message.read_at = datetime.now()
  401. db.add(message)
  402. db.commit()
  403. db.refresh(message)
  404. return _process_message_content(message)
  405. @router.put("/read-all", response_model=dict)
  406. def mark_all_read(
  407. db: Session = Depends(get_db),
  408. current_user: User = Depends(deps.get_current_active_user),
  409. ) -> Any:
  410. now = datetime.now()
  411. result = db.query(Message).filter(
  412. Message.receiver_id == current_user.id,
  413. Message.is_read == False
  414. ).update(
  415. {
  416. "is_read": True,
  417. "read_at": now
  418. },
  419. synchronize_session=False
  420. )
  421. db.commit()
  422. return {"updated_count": result}
  423. @router.delete("/{message_id}", response_model=MessageResponse)
  424. def delete_message(
  425. message_id: int,
  426. db: Session = Depends(get_db),
  427. current_user: User = Depends(deps.get_current_active_user),
  428. ) -> Any:
  429. message = db.query(Message).filter(
  430. Message.id == message_id,
  431. Message.receiver_id == current_user.id
  432. ).first()
  433. if not message:
  434. raise HTTPException(status_code=404, detail="Message not found")
  435. # 先处理返回数据,避免删除后无法访问
  436. processed_msg = _process_message_content(message)
  437. db.delete(message)
  438. db.commit()
  439. return processed_msg
  440. @router.get("/{message_id}/callback-url", response_model=dict)
  441. def get_message_callback_url(
  442. message_id: int,
  443. db: Session = Depends(get_db),
  444. current_user: User = Depends(deps.get_current_active_user),
  445. ) -> Any:
  446. """
  447. 获取消息的 callback URL(用于通知按钮点击)
  448. 内部执行 jump 接口的逻辑,实时生成 ticket 和 callback URL
  449. """
  450. # 1. 获取消息
  451. message = db.query(Message).filter(Message.id == message_id).first()
  452. if not message:
  453. raise HTTPException(status_code=404, detail="消息未找到")
  454. # 2. 验证权限:只有接收者可以获取
  455. if message.receiver_id != current_user.id:
  456. raise HTTPException(status_code=403, detail="无权访问此消息")
  457. # 3. 检查是否有 action_url(jump 接口 URL)
  458. if not message.action_url:
  459. raise HTTPException(status_code=400, detail="此消息没有配置跳转链接")
  460. # 4. 解析 action_url,提取 app_id 和 redirect_to
  461. # action_url 格式: /api/v1/simple/sso/jump?app_id=xxx&redirect_to=xxx
  462. parsed = urlparse(message.action_url)
  463. if not parsed.path.endswith("/sso/jump"):
  464. raise HTTPException(status_code=400, detail="无效的跳转链接格式")
  465. query_params = parse_qs(parsed.query)
  466. app_id = query_params.get("app_id", [None])[0]
  467. redirect_to = query_params.get("redirect_to", [None])[0]
  468. if not app_id or not redirect_to:
  469. raise HTTPException(status_code=400, detail="跳转链接参数不完整")
  470. # 5. 执行 jump 接口的逻辑(但不返回 RedirectResponse,而是返回 JSON)
  471. app = db.query(Application).filter(Application.app_id == app_id).first()
  472. if not app:
  473. raise HTTPException(status_code=404, detail="应用未找到")
  474. # 6. 生成 Ticket(使用当前登录用户)
  475. ticket = TicketService.generate_ticket(current_user.id, app_id)
  476. # 7. 获取应用回调地址
  477. redirect_base = ""
  478. if app.redirect_uris:
  479. try:
  480. uris = json.loads(app.redirect_uris)
  481. if isinstance(uris, list) and len(uris) > 0:
  482. redirect_base = uris[0]
  483. elif isinstance(uris, str):
  484. redirect_base = uris
  485. except (json.JSONDecodeError, TypeError):
  486. redirect_base = app.redirect_uris.strip()
  487. if not redirect_base:
  488. raise HTTPException(status_code=400, detail="应用未配置回调地址")
  489. # 8. 构造最终 callback URL
  490. parsed_uri = urlparse(redirect_base)
  491. callback_query_params = parse_qs(parsed_uri.query)
  492. callback_query_params['ticket'] = [ticket]
  493. callback_query_params['next'] = [redirect_to]
  494. new_query = urlencode(callback_query_params, doseq=True)
  495. callback_url = urlunparse((
  496. parsed_uri.scheme,
  497. parsed_uri.netloc,
  498. parsed_uri.path,
  499. parsed_uri.params,
  500. new_query,
  501. parsed_uri.fragment
  502. ))
  503. return {"callback_url": callback_url}