messages.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. from typing import Any, List, Optional
  2. from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
  3. from sqlalchemy.orm import Session
  4. from sqlalchemy import or_, and_, desc
  5. from app.core.database import get_db
  6. from app.api.v1 import deps
  7. from app.api.v1.deps import AuthSubject, get_current_user_or_app
  8. from app.models.message import Message, MessageType
  9. from app.models.user import User
  10. from app.models.application import Application
  11. from app.models.mapping import AppUserMapping
  12. from app.schemas.message import MessageCreate, MessageResponse, MessageUpdate, ContentType, ConversationResponse
  13. from app.core.websocket_manager import manager
  14. from app.core.config import settings
  15. from app.core.minio import minio_storage
  16. from datetime import datetime
  17. from urllib.parse import quote, urlparse
  18. router = APIRouter()
  19. def _process_message_content(message: Message) -> MessageResponse:
  20. """处理消息内容,为文件类型生成预签名 URL"""
  21. # Pydantic v2 use model_validate
  22. response = MessageResponse.model_validate(message)
  23. if message.content_type in [ContentType.IMAGE, ContentType.VIDEO, ContentType.FILE]:
  24. # 如果内容是对象 Key (不以 http 开头),则生成预签名 URL
  25. # 如果是旧数据的完整 URL,则保持不变 (或视需求处理)
  26. if message.content and not message.content.startswith("http"):
  27. signed_url = minio_storage.get_presigned_url(message.content)
  28. if signed_url:
  29. response.content = signed_url
  30. return response
  31. @router.post("/", response_model=MessageResponse)
  32. async def create_message(
  33. *,
  34. db: Session = Depends(get_db),
  35. message_in: MessageCreate,
  36. current_subject: AuthSubject = Depends(get_current_user_or_app),
  37. background_tasks: BackgroundTasks
  38. ) -> Any:
  39. """
  40. 发送消息 (支持用户私信和应用通知)
  41. 权限:
  42. - 用户:只能发送 MESSAGE
  43. - 应用:可以发送 NOTIFICATION 或 MESSAGE
  44. """
  45. sender_id = None
  46. app_id = None
  47. # 1. 鉴权与身份识别
  48. if isinstance(current_subject, User):
  49. # 用户发送
  50. if message_in.type == MessageType.NOTIFICATION:
  51. raise HTTPException(status_code=403, detail="普通用户无权发送系统通知")
  52. sender_id = current_subject.id
  53. elif isinstance(current_subject, Application):
  54. # 应用发送
  55. app_id = current_subject.id
  56. # 安全校验: 确保传入的 app_id (如果有) 与签名身份一致
  57. if message_in.app_id and message_in.app_id != current_subject.id:
  58. # 这里我们选择忽略传入的 app_id,强制使用当前认证的应用 ID
  59. pass
  60. message_in.app_id = app_id
  61. # 2. 确定接收者 (Receiver Resolution)
  62. final_receiver_id = None
  63. if message_in.receiver_id:
  64. # 方式 A: 直接指定 User ID
  65. final_receiver_id = message_in.receiver_id
  66. user = db.query(User).filter(User.id == final_receiver_id).first()
  67. if not user:
  68. raise HTTPException(status_code=404, detail="接收用户未找到")
  69. elif message_in.app_user_id and message_in.app_id:
  70. # 方式 B: 通过 App User ID 查找映射
  71. # 注意:如果是用户发送,必须要提供 app_id 才能查映射
  72. # 如果是应用发送,message_in.app_id 已经被赋值为 current_subject.id
  73. mapping = db.query(AppUserMapping).filter(
  74. AppUserMapping.app_id == message_in.app_id,
  75. AppUserMapping.app_user_id == message_in.app_user_id
  76. ).first()
  77. if not mapping:
  78. raise HTTPException(
  79. status_code=404,
  80. detail=f"用户映射未找到: App {message_in.app_id}, User {message_in.app_user_id}"
  81. )
  82. final_receiver_id = mapping.user_id
  83. else:
  84. raise HTTPException(status_code=400, detail="必须指定 receiver_id 或 (app_id + app_user_id)")
  85. # 3. 处理 SSO 跳转链接 (Link Generation)
  86. final_action_url = message_in.action_url
  87. if message_in.type == MessageType.NOTIFICATION and message_in.auto_sso and message_in.app_id and message_in.target_url:
  88. # 构造 SSO 中转链接
  89. # 格式: {PLATFORM_URL}/api/v1/auth/sso/jump?app_id={APP_ID}&redirect_to={TARGET_URL}
  90. # 假设 settings.SERVER_HOST 配置了当前服务地址,如果没有则使用相对路径或默认值
  91. # 这里为了演示,假设前端或API base url
  92. base_url = settings.API_V1_STR # /api/v1
  93. encoded_target = quote(message_in.target_url)
  94. final_action_url = f"{base_url}/auth/sso/jump?app_id={message_in.app_id}&redirect_to={encoded_target}"
  95. # 处理内容 (如果是文件类型且传入的是 URL,尝试提取 Key)
  96. content_val = message_in.content if isinstance(message_in.content, str) else str(message_in.content)
  97. if message_in.content_type in [ContentType.IMAGE, ContentType.VIDEO, ContentType.FILE]:
  98. # 简单判断: 如果包含 bucket name,可能是 URL
  99. if settings.MINIO_BUCKET_NAME in content_val and "http" in content_val:
  100. try:
  101. # 尝试从 URL 中提取 path
  102. parsed = urlparse(content_val)
  103. path = parsed.path.lstrip('/')
  104. # path 可能是 "bucket_name/object_key"
  105. if path.startswith(settings.MINIO_BUCKET_NAME + "/"):
  106. content_val = path[len(settings.MINIO_BUCKET_NAME)+1:]
  107. except:
  108. pass # 提取失败则保持原样
  109. # 4. 创建消息
  110. message = Message(
  111. sender_id=sender_id,
  112. receiver_id=final_receiver_id,
  113. app_id=message_in.app_id,
  114. type=message_in.type,
  115. content_type=message_in.content_type,
  116. title=message_in.title,
  117. content=content_val,
  118. action_url=final_action_url,
  119. action_text=message_in.action_text
  120. )
  121. db.add(message)
  122. db.commit()
  123. db.refresh(message)
  124. # 5. 触发实时推送 (WebSocket)
  125. # 处理用于推送的消息内容 (签名)
  126. processed_msg = _process_message_content(message)
  127. push_payload = {
  128. "type": "NEW_MESSAGE",
  129. "data": {
  130. "id": processed_msg.id,
  131. "type": processed_msg.type,
  132. "content_type": processed_msg.content_type,
  133. "title": processed_msg.title,
  134. "content": processed_msg.content, # 使用签名后的 URL
  135. "action_url": processed_msg.action_url,
  136. "action_text": processed_msg.action_text,
  137. "sender_name": "系统通知" if not sender_id else "用户私信", # 简化处理
  138. "sender_id": sender_id, # Add sender_id for frontend to decide left/right
  139. "created_at": str(processed_msg.created_at)
  140. }
  141. }
  142. # 使用后台任务发送 WS 消息,避免阻塞 HTTP 响应
  143. # 如果是发给自己,receiver_id == sender_id,ws 会收到一次
  144. background_tasks.add_task(manager.send_personal_message, push_payload, final_receiver_id)
  145. return processed_msg
  146. @router.get("/", response_model=List[MessageResponse])
  147. def read_messages(
  148. db: Session = Depends(get_db),
  149. skip: int = 0,
  150. limit: int = 100,
  151. unread_only: bool = False,
  152. current_user: User = Depends(deps.get_current_active_user),
  153. ) -> Any:
  154. """
  155. 获取当前用户的消息列表 (所有历史记录)
  156. """
  157. query = db.query(Message).filter(Message.receiver_id == current_user.id)
  158. if unread_only:
  159. query = query.filter(Message.is_read == False)
  160. messages = query.order_by(Message.created_at.desc()).offset(skip).limit(limit).all()
  161. # 处理文件 URL 签名
  162. return [_process_message_content(msg) for msg in messages]
  163. @router.get("/conversations", response_model=List[ConversationResponse])
  164. def get_conversations(
  165. db: Session = Depends(get_db),
  166. current_user: User = Depends(deps.get_current_active_user)
  167. ) -> Any:
  168. """
  169. 获取当前用户的会话列表 (聚合)
  170. """
  171. # 查找所有与我相关的消息
  172. messages = db.query(Message).filter(
  173. or_(
  174. Message.sender_id == current_user.id,
  175. Message.receiver_id == current_user.id
  176. )
  177. ).order_by(Message.created_at.desc()).limit(1000).all()
  178. conversations_map = {}
  179. for msg in messages:
  180. # 确定对话方 (Counterpart)
  181. other_id = None
  182. other_user = None
  183. if msg.type == MessageType.NOTIFICATION:
  184. # 系统通知,归类为一个特殊的 ID 0
  185. other_id = 0
  186. # No user object needed for system
  187. elif msg.sender_id == current_user.id and msg.receiver_id == current_user.id:
  188. # 文件传输助手
  189. other_id = current_user.id
  190. other_user = current_user
  191. elif msg.sender_id == current_user.id:
  192. other_id = msg.receiver_id
  193. other_user = msg.receiver
  194. else:
  195. other_id = msg.sender_id
  196. other_user = msg.sender
  197. # 如果是私信但没找到用户,跳过
  198. if other_id != 0 and not other_user:
  199. continue
  200. # 如果这个对话方还没处理过
  201. if other_id not in conversations_map:
  202. if other_id == 0:
  203. username = "System"
  204. full_name = "系统通知"
  205. else:
  206. username = other_user.mobile # User has mobile, not username
  207. full_name = other_user.name or other_user.english_name or other_user.mobile
  208. conversations_map[other_id] = {
  209. "user_id": other_id,
  210. "username": username,
  211. "full_name": full_name,
  212. "unread_count": 0,
  213. "last_message": msg.content if msg.content_type == ContentType.TEXT else f"[{msg.content_type}]",
  214. "last_message_type": msg.content_type,
  215. "updated_at": msg.created_at
  216. }
  217. # 累加未读数 (只计算接收方是自己的未读消息)
  218. # 注意: 这里的 is_read 是针对接收者的状态
  219. # 即使是自己发送的消息,msg.receiver_id 也不会是自己(除非发给自己)
  220. # 所以这里的判断逻辑是: 如果我是接收者,且未读,则计入未读数
  221. if not msg.is_read and msg.receiver_id == current_user.id:
  222. conversations_map[other_id]["unread_count"] += 1
  223. return list(conversations_map.values())
  224. @router.get("/history/{other_user_id}", response_model=List[MessageResponse])
  225. def get_chat_history(
  226. other_user_id: int,
  227. skip: int = 0,
  228. limit: int = 50,
  229. db: Session = Depends(get_db),
  230. current_user: User = Depends(deps.get_current_active_user)
  231. ) -> Any:
  232. """
  233. 获取与特定用户的聊天记录
  234. """
  235. if other_user_id == 0:
  236. # System Notifications
  237. query = db.query(Message).filter(
  238. Message.receiver_id == current_user.id,
  239. Message.type == MessageType.NOTIFICATION
  240. ).order_by(Message.created_at.desc())
  241. else:
  242. # User Chat
  243. query = db.query(Message).filter(
  244. or_(
  245. and_(Message.sender_id == current_user.id, Message.receiver_id == other_user_id),
  246. and_(Message.sender_id == other_user_id, Message.receiver_id == current_user.id)
  247. )
  248. ).order_by(Message.created_at.desc()) # 最新在前
  249. messages = query.offset(skip).limit(limit).all()
  250. return [_process_message_content(msg) for msg in messages]
  251. @router.get("/unread-count", response_model=int)
  252. def get_unread_count(
  253. db: Session = Depends(get_db),
  254. current_user: User = Depends(deps.get_current_active_user),
  255. ) -> Any:
  256. count = db.query(Message).filter(
  257. Message.receiver_id == current_user.id,
  258. Message.is_read == False
  259. ).count()
  260. return count
  261. @router.put("/{message_id}/read", response_model=MessageResponse)
  262. def mark_as_read(
  263. message_id: int,
  264. db: Session = Depends(get_db),
  265. current_user: User = Depends(deps.get_current_active_user),
  266. ) -> Any:
  267. message = db.query(Message).filter(
  268. Message.id == message_id,
  269. Message.receiver_id == current_user.id
  270. ).first()
  271. if not message:
  272. raise HTTPException(status_code=404, detail="Message not found")
  273. if not message.is_read:
  274. message.is_read = True
  275. message.read_at = datetime.now()
  276. db.add(message)
  277. db.commit()
  278. db.refresh(message)
  279. return _process_message_content(message)
  280. @router.put("/read-all", response_model=dict)
  281. def mark_all_read(
  282. db: Session = Depends(get_db),
  283. current_user: User = Depends(deps.get_current_active_user),
  284. ) -> Any:
  285. now = datetime.now()
  286. result = db.query(Message).filter(
  287. Message.receiver_id == current_user.id,
  288. Message.is_read == False
  289. ).update(
  290. {
  291. "is_read": True,
  292. "read_at": now
  293. },
  294. synchronize_session=False
  295. )
  296. db.commit()
  297. return {"updated_count": result}
  298. @router.delete("/{message_id}", response_model=MessageResponse)
  299. def delete_message(
  300. message_id: int,
  301. db: Session = Depends(get_db),
  302. current_user: User = Depends(deps.get_current_active_user),
  303. ) -> Any:
  304. message = db.query(Message).filter(
  305. Message.id == message_id,
  306. Message.receiver_id == current_user.id
  307. ).first()
  308. if not message:
  309. raise HTTPException(status_code=404, detail="Message not found")
  310. # 先处理返回数据,避免删除后无法访问
  311. processed_msg = _process_message_content(message)
  312. db.delete(message)
  313. db.commit()
  314. return processed_msg