messages.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. from typing import Any, List, Optional
  2. from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
  3. from sqlalchemy.orm import Session, joinedload
  4. from sqlalchemy import or_, and_, desc
  5. from app.core.database import get_db
  6. from app.api.v1 import deps
  7. from app.api.v1.deps import AuthSubject, get_current_user_or_app
  8. from app.models.message import Message, MessageType
  9. from app.models.user import User
  10. from app.models.application import Application
  11. from app.models.mapping import AppUserMapping
  12. from app.schemas.message import MessageCreate, MessageResponse, MessageUpdate, ContentType, ConversationResponse
  13. from app.core.websocket_manager import manager
  14. from app.core.config import settings
  15. from app.core.minio import minio_storage
  16. from datetime import datetime
  17. from urllib.parse import quote, urlparse
  18. router = APIRouter()
  19. def _process_message_content(message: Message) -> MessageResponse:
  20. """处理消息内容,为文件类型生成预签名 URL"""
  21. # Pydantic v2 use model_validate
  22. response = MessageResponse.model_validate(message)
  23. # 填充应用名称
  24. if message.app_id and message.app:
  25. response.app_name = message.app.app_name
  26. if message.content_type in [ContentType.IMAGE, ContentType.VIDEO, ContentType.FILE]:
  27. # 如果内容是对象 Key (不以 http 开头),则生成预签名 URL
  28. # 如果是旧数据的完整 URL,则保持不变 (或视需求处理)
  29. if message.content and not message.content.startswith("http"):
  30. signed_url = minio_storage.get_presigned_url(message.content)
  31. if signed_url:
  32. response.content = signed_url
  33. return response
  34. @router.post("/", response_model=MessageResponse)
  35. async def create_message(
  36. *,
  37. db: Session = Depends(get_db),
  38. message_in: MessageCreate,
  39. current_subject: AuthSubject = Depends(get_current_user_or_app),
  40. background_tasks: BackgroundTasks
  41. ) -> Any:
  42. """
  43. 发送消息 (支持用户私信和应用通知)
  44. 权限:
  45. - 用户:只能发送 MESSAGE
  46. - 应用:可以发送 NOTIFICATION 或 MESSAGE
  47. """
  48. sender_id = None
  49. app_id = None
  50. # 1. 鉴权与身份识别
  51. if isinstance(current_subject, User):
  52. # 用户发送
  53. if message_in.type == MessageType.NOTIFICATION:
  54. raise HTTPException(status_code=403, detail="普通用户无权发送系统通知")
  55. sender_id = current_subject.id
  56. elif isinstance(current_subject, Application):
  57. # 应用发送
  58. app_id = current_subject.id
  59. # 安全校验: 确保传入的 app_id (如果有) 与签名身份一致
  60. if message_in.app_id and message_in.app_id != current_subject.id:
  61. # 这里我们选择忽略传入的 app_id,强制使用当前认证的应用 ID
  62. pass
  63. message_in.app_id = app_id
  64. # 2. 确定接收者 (Receiver Resolution)
  65. final_receiver_id = None
  66. if message_in.receiver_id:
  67. # 方式 A: 直接指定 User ID
  68. final_receiver_id = message_in.receiver_id
  69. user = db.query(User).filter(User.id == final_receiver_id).first()
  70. if not user:
  71. raise HTTPException(status_code=404, detail="接收用户未找到")
  72. elif message_in.app_user_id and message_in.app_id:
  73. # 方式 B: 通过 App User ID 查找映射
  74. # 注意:如果是用户发送,必须要提供 app_id 才能查映射
  75. # 如果是应用发送,message_in.app_id 已经被赋值为 current_subject.id
  76. mapping = db.query(AppUserMapping).filter(
  77. AppUserMapping.app_id == message_in.app_id,
  78. AppUserMapping.mapped_key == message_in.app_user_id
  79. ).first()
  80. if not mapping:
  81. raise HTTPException(
  82. status_code=404,
  83. detail=f"用户映射未找到: App {message_in.app_id}, User {message_in.app_user_id}"
  84. )
  85. final_receiver_id = mapping.user_id
  86. else:
  87. raise HTTPException(status_code=400, detail="必须指定 receiver_id 或 (app_id + app_user_id)")
  88. # 3. 处理 SSO 跳转链接 (Link Generation)
  89. final_action_url = message_in.action_url
  90. if message_in.type == MessageType.NOTIFICATION and message_in.auto_sso and message_in.app_id and message_in.target_url:
  91. # 构造 SSO 中转链接
  92. # 格式: {PLATFORM_URL}/api/v1/auth/sso/jump?app_id={APP_ID}&redirect_to={TARGET_URL}
  93. # 假设 settings.SERVER_HOST 配置了当前服务地址,如果没有则使用相对路径或默认值
  94. # 这里为了演示,假设前端或API base url
  95. base_url = settings.API_V1_STR # /api/v1
  96. encoded_target = quote(message_in.target_url)
  97. final_action_url = f"{base_url}/auth/sso/jump?app_id={message_in.app_id}&redirect_to={encoded_target}"
  98. # 处理内容 (如果是文件类型且传入的是 URL,尝试提取 Key)
  99. content_val = message_in.content if isinstance(message_in.content, str) else str(message_in.content)
  100. if message_in.content_type in [ContentType.IMAGE, ContentType.VIDEO, ContentType.FILE]:
  101. # 简单判断: 如果包含 bucket name,可能是 URL
  102. if settings.MINIO_BUCKET_NAME in content_val and "http" in content_val:
  103. try:
  104. # 尝试从 URL 中提取 path
  105. parsed = urlparse(content_val)
  106. path = parsed.path.lstrip('/')
  107. # path 可能是 "bucket_name/object_key"
  108. if path.startswith(settings.MINIO_BUCKET_NAME + "/"):
  109. content_val = path[len(settings.MINIO_BUCKET_NAME)+1:]
  110. except:
  111. pass # 提取失败则保持原样
  112. # 4. 创建消息
  113. message = Message(
  114. sender_id=sender_id,
  115. receiver_id=final_receiver_id,
  116. app_id=message_in.app_id,
  117. type=message_in.type,
  118. content_type=message_in.content_type,
  119. title=message_in.title,
  120. content=content_val,
  121. action_url=final_action_url,
  122. action_text=message_in.action_text
  123. )
  124. db.add(message)
  125. db.commit()
  126. db.refresh(message)
  127. # 5. 触发实时推送 (WebSocket)
  128. # 处理用于推送的消息内容 (签名)
  129. processed_msg = _process_message_content(message)
  130. push_payload = {
  131. "type": "NEW_MESSAGE",
  132. "data": {
  133. "id": processed_msg.id,
  134. "type": processed_msg.type,
  135. "content_type": processed_msg.content_type,
  136. "title": processed_msg.title,
  137. "content": processed_msg.content, # 使用签名后的 URL
  138. "action_url": processed_msg.action_url,
  139. "action_text": processed_msg.action_text,
  140. "sender_name": "系统通知" if not sender_id else "用户私信", # 简化处理
  141. "sender_id": sender_id, # Add sender_id for frontend to decide left/right
  142. "created_at": str(processed_msg.created_at)
  143. }
  144. }
  145. # 使用后台任务发送 WS 消息,避免阻塞 HTTP 响应
  146. # 如果是发给自己,receiver_id == sender_id,ws 会收到一次
  147. background_tasks.add_task(manager.send_personal_message, push_payload, final_receiver_id)
  148. return processed_msg
  149. @router.get("/", response_model=List[MessageResponse])
  150. def read_messages(
  151. db: Session = Depends(get_db),
  152. skip: int = 0,
  153. limit: int = 100,
  154. unread_only: bool = False,
  155. current_user: User = Depends(deps.get_current_active_user),
  156. ) -> Any:
  157. """
  158. 获取当前用户的消息列表 (所有历史记录)
  159. """
  160. query = db.query(Message).options(joinedload(Message.app)).filter(Message.receiver_id == current_user.id)
  161. if unread_only:
  162. query = query.filter(Message.is_read == False)
  163. messages = query.order_by(Message.created_at.desc()).offset(skip).limit(limit).all()
  164. # 处理文件 URL 签名
  165. return [_process_message_content(msg) for msg in messages]
  166. @router.get("/conversations", response_model=List[ConversationResponse])
  167. def get_conversations(
  168. db: Session = Depends(get_db),
  169. current_user: User = Depends(deps.get_current_active_user)
  170. ) -> Any:
  171. """
  172. 获取当前用户的会话列表 (聚合)
  173. """
  174. # 查找所有与我相关的消息
  175. messages = db.query(Message).filter(
  176. or_(
  177. Message.sender_id == current_user.id,
  178. Message.receiver_id == current_user.id
  179. )
  180. ).order_by(Message.created_at.desc()).limit(1000).all()
  181. conversations_map = {}
  182. for msg in messages:
  183. # 确定对话方 (Counterpart)
  184. other_id = None
  185. other_user = None
  186. if msg.type == MessageType.NOTIFICATION:
  187. # 系统通知,归类为一个特殊的 ID 0
  188. other_id = 0
  189. # No user object needed for system
  190. elif msg.sender_id == current_user.id and msg.receiver_id == current_user.id:
  191. # 文件传输助手
  192. other_id = current_user.id
  193. other_user = current_user
  194. elif msg.sender_id == current_user.id:
  195. other_id = msg.receiver_id
  196. other_user = msg.receiver
  197. else:
  198. other_id = msg.sender_id
  199. other_user = msg.sender
  200. # 如果是私信但没找到用户,跳过
  201. if other_id != 0 and not other_user:
  202. continue
  203. # 如果这个对话方还没处理过
  204. if other_id not in conversations_map:
  205. if other_id == 0:
  206. username = "System"
  207. full_name = "系统通知"
  208. else:
  209. username = other_user.mobile # User has mobile, not username
  210. full_name = other_user.name or other_user.english_name or other_user.mobile
  211. conversations_map[other_id] = {
  212. "user_id": other_id,
  213. "username": username,
  214. "full_name": full_name,
  215. "unread_count": 0,
  216. "last_message": msg.content if msg.content_type == ContentType.TEXT else f"[{msg.content_type}]",
  217. "last_message_type": msg.content_type,
  218. "updated_at": msg.created_at
  219. }
  220. # 累加未读数 (只计算接收方是自己的未读消息)
  221. # 注意: 这里的 is_read 是针对接收者的状态
  222. # 即使是自己发送的消息,msg.receiver_id 也不会是自己(除非发给自己)
  223. # 所以这里的判断逻辑是: 如果我是接收者,且未读,则计入未读数
  224. if not msg.is_read and msg.receiver_id == current_user.id:
  225. conversations_map[other_id]["unread_count"] += 1
  226. return list(conversations_map.values())
  227. @router.get("/history/{other_user_id}", response_model=List[MessageResponse])
  228. def get_chat_history(
  229. other_user_id: int,
  230. skip: int = 0,
  231. limit: int = 50,
  232. db: Session = Depends(get_db),
  233. current_user: User = Depends(deps.get_current_active_user)
  234. ) -> Any:
  235. """
  236. 获取与特定用户的聊天记录
  237. """
  238. if other_user_id == 0:
  239. # System Notifications
  240. query = db.query(Message).options(joinedload(Message.app)).filter(
  241. Message.receiver_id == current_user.id,
  242. Message.type == MessageType.NOTIFICATION
  243. ).order_by(Message.created_at.desc())
  244. else:
  245. # User Chat
  246. query = db.query(Message).options(joinedload(Message.app)).filter(
  247. or_(
  248. and_(Message.sender_id == current_user.id, Message.receiver_id == other_user_id),
  249. and_(Message.sender_id == other_user_id, Message.receiver_id == current_user.id)
  250. )
  251. ).order_by(Message.created_at.desc()) # 最新在前
  252. messages = query.offset(skip).limit(limit).all()
  253. return [_process_message_content(msg) for msg in messages]
  254. @router.get("/unread-count", response_model=int)
  255. def get_unread_count(
  256. db: Session = Depends(get_db),
  257. current_user: User = Depends(deps.get_current_active_user),
  258. ) -> Any:
  259. count = db.query(Message).filter(
  260. Message.receiver_id == current_user.id,
  261. Message.is_read == False
  262. ).count()
  263. return count
  264. @router.put("/{message_id}/read", response_model=MessageResponse)
  265. def mark_as_read(
  266. message_id: int,
  267. db: Session = Depends(get_db),
  268. current_user: User = Depends(deps.get_current_active_user),
  269. ) -> Any:
  270. message = db.query(Message).filter(
  271. Message.id == message_id,
  272. Message.receiver_id == current_user.id
  273. ).first()
  274. if not message:
  275. raise HTTPException(status_code=404, detail="Message not found")
  276. if not message.is_read:
  277. message.is_read = True
  278. message.read_at = datetime.now()
  279. db.add(message)
  280. db.commit()
  281. db.refresh(message)
  282. return _process_message_content(message)
  283. @router.put("/read-all", response_model=dict)
  284. def mark_all_read(
  285. db: Session = Depends(get_db),
  286. current_user: User = Depends(deps.get_current_active_user),
  287. ) -> Any:
  288. now = datetime.now()
  289. result = db.query(Message).filter(
  290. Message.receiver_id == current_user.id,
  291. Message.is_read == False
  292. ).update(
  293. {
  294. "is_read": True,
  295. "read_at": now
  296. },
  297. synchronize_session=False
  298. )
  299. db.commit()
  300. return {"updated_count": result}
  301. @router.delete("/{message_id}", response_model=MessageResponse)
  302. def delete_message(
  303. message_id: int,
  304. db: Session = Depends(get_db),
  305. current_user: User = Depends(deps.get_current_active_user),
  306. ) -> Any:
  307. message = db.query(Message).filter(
  308. Message.id == message_id,
  309. Message.receiver_id == current_user.id
  310. ).first()
  311. if not message:
  312. raise HTTPException(status_code=404, detail="Message not found")
  313. # 先处理返回数据,避免删除后无法访问
  314. processed_msg = _process_message_content(message)
  315. db.delete(message)
  316. db.commit()
  317. return processed_msg