messages.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. from typing import Any, List, Optional
  2. from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
  3. from sqlalchemy.orm import Session, joinedload
  4. from sqlalchemy import or_, and_, desc
  5. from app.core.database import get_db
  6. from app.api.v1 import deps
  7. from app.api.v1.deps import AuthSubject, get_current_user_or_app
  8. from app.models.message import Message, MessageType
  9. from app.models.user import User
  10. from app.models.application import Application
  11. from app.models.mapping import AppUserMapping
  12. from app.schemas.message import MessageCreate, MessageResponse, MessageUpdate, ContentType, ConversationResponse
  13. from app.core.websocket_manager import manager
  14. from app.core.config import settings
  15. from app.core.minio import minio_storage
  16. from app.services.ticket_service import TicketService
  17. from datetime import datetime
  18. from urllib.parse import quote, urlparse, parse_qs, urlencode, urlunparse
  19. import json
  20. router = APIRouter()
  21. def _process_message_content(message: Message) -> MessageResponse:
  22. """处理消息内容,为文件类型生成预签名 URL"""
  23. # Pydantic v2 use model_validate
  24. response = MessageResponse.model_validate(message)
  25. # 填充应用名称
  26. if message.app_id and message.app:
  27. response.app_name = message.app.app_name
  28. if message.content_type in [ContentType.IMAGE, ContentType.VIDEO, ContentType.FILE]:
  29. # 如果内容是对象 Key (不以 http 开头),则生成预签名 URL
  30. # 如果是旧数据的完整 URL,则保持不变 (或视需求处理)
  31. if message.content and not message.content.startswith("http"):
  32. signed_url = minio_storage.get_presigned_url(message.content)
  33. if signed_url:
  34. response.content = signed_url
  35. return response
  36. @router.post("/", response_model=MessageResponse)
  37. async def create_message(
  38. *,
  39. db: Session = Depends(get_db),
  40. message_in: MessageCreate,
  41. current_subject: AuthSubject = Depends(get_current_user_or_app),
  42. background_tasks: BackgroundTasks
  43. ) -> Any:
  44. """
  45. 发送消息 (支持用户私信和应用通知)
  46. 权限:
  47. - 用户:只能发送 MESSAGE
  48. - 应用:可以发送 NOTIFICATION 或 MESSAGE
  49. """
  50. sender_id = None
  51. app_id_int = None # 数据库中的 Integer ID(用于存储到 Message 表)
  52. app_id_str = None # 字符串类型的 app_id(用于 SSO 跳转链接)
  53. # 1. 鉴权与身份识别
  54. if isinstance(current_subject, User):
  55. # 用户发送
  56. if message_in.type == MessageType.NOTIFICATION:
  57. raise HTTPException(status_code=403, detail="普通用户无权发送系统通知")
  58. sender_id = current_subject.id
  59. # 如果用户发送时提供了 app_id(字符串),需要查找对应的应用
  60. if message_in.app_id:
  61. app = db.query(Application).filter(Application.app_id == message_in.app_id).first()
  62. if not app:
  63. raise HTTPException(status_code=404, detail=f"应用未找到: {message_in.app_id}")
  64. app_id_int = app.id
  65. app_id_str = app.app_id
  66. elif isinstance(current_subject, Application):
  67. # 应用发送
  68. app_id_int = current_subject.id
  69. app_id_str = current_subject.app_id
  70. # 安全校验: 如果传入了 app_id(字符串),确保与签名身份一致
  71. if message_in.app_id:
  72. if message_in.app_id != current_subject.app_id:
  73. raise HTTPException(
  74. status_code=403,
  75. detail=f"传入的 app_id ({message_in.app_id}) 与认证应用不匹配"
  76. )
  77. # 使用当前认证应用的 app_id(字符串)
  78. message_in.app_id = app_id_str
  79. # 2. 确定接收者 (Receiver Resolution)
  80. final_receiver_id = None
  81. if message_in.receiver_id:
  82. # 方式 A: 直接指定 User ID
  83. final_receiver_id = message_in.receiver_id
  84. user = db.query(User).filter(User.id == final_receiver_id).first()
  85. if not user:
  86. raise HTTPException(status_code=404, detail="接收用户未找到")
  87. elif message_in.app_user_id and message_in.app_id:
  88. # 方式 B: 通过 App User ID 查找映射
  89. # 注意:如果是用户发送,必须要提供 app_id 才能查映射
  90. # 如果是应用发送,message_in.app_id 已经是字符串类型
  91. # 如果 app_id_int 还没有设置(用户发送且提供了字符串 app_id),需要查找
  92. if app_id_int is None:
  93. app = db.query(Application).filter(Application.app_id == message_in.app_id).first()
  94. if not app:
  95. raise HTTPException(status_code=404, detail=f"应用未找到: {message_in.app_id}")
  96. app_id_int = app.id
  97. app_id_str = app.app_id
  98. mapping = db.query(AppUserMapping).filter(
  99. AppUserMapping.app_id == app_id_int, # 使用 Integer ID 查询映射表
  100. AppUserMapping.mapped_key == message_in.app_user_id
  101. ).first()
  102. if not mapping:
  103. raise HTTPException(
  104. status_code=404,
  105. detail=f"用户映射未找到: App {message_in.app_id}, User {message_in.app_user_id}"
  106. )
  107. final_receiver_id = mapping.user_id
  108. else:
  109. raise HTTPException(status_code=400, detail="必须指定 receiver_id 或 (app_id + app_user_id)")
  110. # 3. 处理 SSO 跳转链接 (Link Generation)
  111. final_action_url = message_in.action_url
  112. if message_in.type == MessageType.NOTIFICATION and message_in.auto_sso and app_id_str and message_in.target_url:
  113. # 生成 jump 接口 URL,用户点击时调用后端接口生成 callback URL
  114. # 格式: {PLATFORM_URL}/api/v1/simple/sso/jump?app_id={APP_ID}&redirect_to={TARGET_URL}
  115. base_url = settings.API_V1_STR # /api/v1
  116. encoded_target = quote(message_in.target_url)
  117. final_action_url = f"{base_url}/simple/sso/jump?app_id={app_id_str}&redirect_to={encoded_target}"
  118. # 处理内容 (如果是文件类型且传入的是 URL,尝试提取 Key)
  119. content_val = message_in.content if isinstance(message_in.content, str) else str(message_in.content)
  120. if message_in.content_type in [ContentType.IMAGE, ContentType.VIDEO, ContentType.FILE]:
  121. # 简单判断: 如果包含 bucket name,可能是 URL
  122. if settings.MINIO_BUCKET_NAME in content_val and "http" in content_val:
  123. try:
  124. # 尝试从 URL 中提取 path
  125. parsed = urlparse(content_val)
  126. path = parsed.path.lstrip('/')
  127. # path 可能是 "bucket_name/object_key"
  128. if path.startswith(settings.MINIO_BUCKET_NAME + "/"):
  129. content_val = path[len(settings.MINIO_BUCKET_NAME)+1:]
  130. except:
  131. pass # 提取失败则保持原样
  132. # 4. 创建消息
  133. message = Message(
  134. sender_id=sender_id,
  135. receiver_id=final_receiver_id,
  136. app_id=app_id_int, # 使用 Integer ID 存储到数据库
  137. type=message_in.type,
  138. content_type=message_in.content_type,
  139. title=message_in.title,
  140. content=content_val,
  141. action_url=final_action_url,
  142. action_text=message_in.action_text
  143. )
  144. db.add(message)
  145. db.commit()
  146. db.refresh(message)
  147. # 5. 触发实时推送 (WebSocket)
  148. # 处理用于推送的消息内容 (签名)
  149. processed_msg = _process_message_content(message)
  150. push_payload = {
  151. "type": "NEW_MESSAGE",
  152. "data": {
  153. "id": processed_msg.id,
  154. "type": processed_msg.type,
  155. "content_type": processed_msg.content_type,
  156. "title": processed_msg.title,
  157. "content": processed_msg.content, # 使用签名后的 URL
  158. "action_url": processed_msg.action_url,
  159. "action_text": processed_msg.action_text,
  160. "sender_name": "系统通知" if not sender_id else "用户私信", # 简化处理
  161. "sender_id": sender_id, # Add sender_id for frontend to decide left/right
  162. "created_at": str(processed_msg.created_at)
  163. }
  164. }
  165. # 使用后台任务发送 WS 消息,避免阻塞 HTTP 响应
  166. # 如果是发给自己,receiver_id == sender_id,ws 会收到一次
  167. background_tasks.add_task(manager.send_personal_message, push_payload, final_receiver_id)
  168. return processed_msg
  169. @router.get("/", response_model=List[MessageResponse])
  170. def read_messages(
  171. db: Session = Depends(get_db),
  172. skip: int = 0,
  173. limit: int = 100,
  174. unread_only: bool = False,
  175. current_user: User = Depends(deps.get_current_active_user),
  176. ) -> Any:
  177. """
  178. 获取当前用户的消息列表 (所有历史记录)
  179. """
  180. query = db.query(Message).options(joinedload(Message.app)).filter(Message.receiver_id == current_user.id)
  181. if unread_only:
  182. query = query.filter(Message.is_read == False)
  183. messages = query.order_by(Message.created_at.desc()).offset(skip).limit(limit).all()
  184. # 处理文件 URL 签名
  185. return [_process_message_content(msg) for msg in messages]
  186. @router.get("/conversations", response_model=List[ConversationResponse])
  187. def get_conversations(
  188. db: Session = Depends(get_db),
  189. current_user: User = Depends(deps.get_current_active_user)
  190. ) -> Any:
  191. """
  192. 获取当前用户的会话列表 (聚合)
  193. """
  194. # 查找所有与我相关的消息
  195. messages = db.query(Message).filter(
  196. or_(
  197. Message.sender_id == current_user.id,
  198. Message.receiver_id == current_user.id
  199. )
  200. ).order_by(Message.created_at.desc()).limit(1000).all()
  201. conversations_map = {}
  202. for msg in messages:
  203. # 确定对话方 (Counterpart)
  204. other_id = None
  205. other_user = None
  206. if msg.type == MessageType.NOTIFICATION:
  207. # 系统通知,归类为一个特殊的 ID 0
  208. other_id = 0
  209. # No user object needed for system
  210. elif msg.sender_id == current_user.id and msg.receiver_id == current_user.id:
  211. # 文件传输助手
  212. other_id = current_user.id
  213. other_user = current_user
  214. elif msg.sender_id == current_user.id:
  215. other_id = msg.receiver_id
  216. other_user = msg.receiver
  217. else:
  218. other_id = msg.sender_id
  219. other_user = msg.sender
  220. # 如果是私信但没找到用户,跳过
  221. if other_id != 0 and not other_user:
  222. continue
  223. # 如果这个对话方还没处理过
  224. if other_id not in conversations_map:
  225. if other_id == 0:
  226. username = "System"
  227. full_name = "系统通知"
  228. else:
  229. username = other_user.mobile # User has mobile, not username
  230. full_name = other_user.name or other_user.english_name or other_user.mobile
  231. conversations_map[other_id] = {
  232. "user_id": other_id,
  233. "username": username,
  234. "full_name": full_name,
  235. "unread_count": 0,
  236. "last_message": msg.content if msg.content_type == ContentType.TEXT else f"[{msg.content_type}]",
  237. "last_message_type": msg.content_type,
  238. "updated_at": msg.created_at
  239. }
  240. # 累加未读数 (只计算接收方是自己的未读消息)
  241. # 注意: 这里的 is_read 是针对接收者的状态
  242. # 即使是自己发送的消息,msg.receiver_id 也不会是自己(除非发给自己)
  243. # 所以这里的判断逻辑是: 如果我是接收者,且未读,则计入未读数
  244. if not msg.is_read and msg.receiver_id == current_user.id:
  245. conversations_map[other_id]["unread_count"] += 1
  246. return list(conversations_map.values())
  247. @router.get("/history/{other_user_id}", response_model=List[MessageResponse])
  248. def get_chat_history(
  249. other_user_id: int,
  250. skip: int = 0,
  251. limit: int = 50,
  252. db: Session = Depends(get_db),
  253. current_user: User = Depends(deps.get_current_active_user)
  254. ) -> Any:
  255. """
  256. 获取与特定用户的聊天记录
  257. """
  258. if other_user_id == 0:
  259. # System Notifications
  260. query = db.query(Message).options(joinedload(Message.app)).filter(
  261. Message.receiver_id == current_user.id,
  262. Message.type == MessageType.NOTIFICATION
  263. ).order_by(Message.created_at.desc())
  264. else:
  265. # User Chat
  266. query = db.query(Message).options(joinedload(Message.app)).filter(
  267. or_(
  268. and_(Message.sender_id == current_user.id, Message.receiver_id == other_user_id),
  269. and_(Message.sender_id == other_user_id, Message.receiver_id == current_user.id)
  270. )
  271. ).order_by(Message.created_at.desc()) # 最新在前
  272. messages = query.offset(skip).limit(limit).all()
  273. return [_process_message_content(msg) for msg in messages]
  274. @router.get("/unread-count", response_model=int)
  275. def get_unread_count(
  276. db: Session = Depends(get_db),
  277. current_user: User = Depends(deps.get_current_active_user),
  278. ) -> Any:
  279. count = db.query(Message).filter(
  280. Message.receiver_id == current_user.id,
  281. Message.is_read == False
  282. ).count()
  283. return count
  284. @router.put("/{message_id}/read", response_model=MessageResponse)
  285. def mark_as_read(
  286. message_id: int,
  287. db: Session = Depends(get_db),
  288. current_user: User = Depends(deps.get_current_active_user),
  289. ) -> Any:
  290. message = db.query(Message).filter(
  291. Message.id == message_id,
  292. Message.receiver_id == current_user.id
  293. ).first()
  294. if not message:
  295. raise HTTPException(status_code=404, detail="Message not found")
  296. if not message.is_read:
  297. message.is_read = True
  298. message.read_at = datetime.now()
  299. db.add(message)
  300. db.commit()
  301. db.refresh(message)
  302. return _process_message_content(message)
  303. @router.put("/read-all", response_model=dict)
  304. def mark_all_read(
  305. db: Session = Depends(get_db),
  306. current_user: User = Depends(deps.get_current_active_user),
  307. ) -> Any:
  308. now = datetime.now()
  309. result = db.query(Message).filter(
  310. Message.receiver_id == current_user.id,
  311. Message.is_read == False
  312. ).update(
  313. {
  314. "is_read": True,
  315. "read_at": now
  316. },
  317. synchronize_session=False
  318. )
  319. db.commit()
  320. return {"updated_count": result}
  321. @router.delete("/{message_id}", response_model=MessageResponse)
  322. def delete_message(
  323. message_id: int,
  324. db: Session = Depends(get_db),
  325. current_user: User = Depends(deps.get_current_active_user),
  326. ) -> Any:
  327. message = db.query(Message).filter(
  328. Message.id == message_id,
  329. Message.receiver_id == current_user.id
  330. ).first()
  331. if not message:
  332. raise HTTPException(status_code=404, detail="Message not found")
  333. # 先处理返回数据,避免删除后无法访问
  334. processed_msg = _process_message_content(message)
  335. db.delete(message)
  336. db.commit()
  337. return processed_msg
  338. @router.get("/{message_id}/callback-url", response_model=dict)
  339. def get_message_callback_url(
  340. message_id: int,
  341. db: Session = Depends(get_db),
  342. current_user: User = Depends(deps.get_current_active_user),
  343. ) -> Any:
  344. """
  345. 获取消息的 callback URL(用于通知按钮点击)
  346. 内部执行 jump 接口的逻辑,实时生成 ticket 和 callback URL
  347. """
  348. # 1. 获取消息
  349. message = db.query(Message).filter(Message.id == message_id).first()
  350. if not message:
  351. raise HTTPException(status_code=404, detail="消息未找到")
  352. # 2. 验证权限:只有接收者可以获取
  353. if message.receiver_id != current_user.id:
  354. raise HTTPException(status_code=403, detail="无权访问此消息")
  355. # 3. 检查是否有 action_url(jump 接口 URL)
  356. if not message.action_url:
  357. raise HTTPException(status_code=400, detail="此消息没有配置跳转链接")
  358. # 4. 解析 action_url,提取 app_id 和 redirect_to
  359. # action_url 格式: /api/v1/simple/sso/jump?app_id=xxx&redirect_to=xxx
  360. parsed = urlparse(message.action_url)
  361. if not parsed.path.endswith("/sso/jump"):
  362. raise HTTPException(status_code=400, detail="无效的跳转链接格式")
  363. query_params = parse_qs(parsed.query)
  364. app_id = query_params.get("app_id", [None])[0]
  365. redirect_to = query_params.get("redirect_to", [None])[0]
  366. if not app_id or not redirect_to:
  367. raise HTTPException(status_code=400, detail="跳转链接参数不完整")
  368. # 5. 执行 jump 接口的逻辑(但不返回 RedirectResponse,而是返回 JSON)
  369. app = db.query(Application).filter(Application.app_id == app_id).first()
  370. if not app:
  371. raise HTTPException(status_code=404, detail="应用未找到")
  372. # 6. 生成 Ticket(使用当前登录用户)
  373. ticket = TicketService.generate_ticket(current_user.id, app_id)
  374. # 7. 获取应用回调地址
  375. redirect_base = ""
  376. if app.redirect_uris:
  377. try:
  378. uris = json.loads(app.redirect_uris)
  379. if isinstance(uris, list) and len(uris) > 0:
  380. redirect_base = uris[0]
  381. elif isinstance(uris, str):
  382. redirect_base = uris
  383. except (json.JSONDecodeError, TypeError):
  384. redirect_base = app.redirect_uris.strip()
  385. if not redirect_base:
  386. raise HTTPException(status_code=400, detail="应用未配置回调地址")
  387. # 8. 构造最终 callback URL
  388. parsed_uri = urlparse(redirect_base)
  389. callback_query_params = parse_qs(parsed_uri.query)
  390. callback_query_params['ticket'] = [ticket]
  391. callback_query_params['next'] = [redirect_to]
  392. new_query = urlencode(callback_query_params, doseq=True)
  393. callback_url = urlunparse((
  394. parsed_uri.scheme,
  395. parsed_uri.netloc,
  396. parsed_uri.path,
  397. parsed_uri.params,
  398. new_query,
  399. parsed_uri.fragment
  400. ))
  401. return {"callback_url": callback_url}