messages.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. from typing import Any, List, Optional
  2. from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
  3. from sqlalchemy.orm import Session, joinedload
  4. from sqlalchemy import or_, and_, desc
  5. from app.core.database import get_db
  6. from app.api.v1 import deps
  7. from app.api.v1.deps import AuthSubject, get_current_user_or_app
  8. from app.models.message import Message, MessageType
  9. from app.models.user import User
  10. from app.models.application import Application
  11. from app.models.mapping import AppUserMapping
  12. from app.schemas.message import MessageCreate, MessageResponse, MessageUpdate, ContentType, ConversationResponse
  13. from app.core.websocket_manager import manager
  14. from app.core.config import settings
  15. from app.core.minio import minio_storage
  16. from app.services.ticket_service import TicketService
  17. from datetime import datetime
  18. from urllib.parse import quote, urlparse, parse_qs, urlencode, urlunparse
  19. import json
  20. router = APIRouter()
  21. def _conversation_last_preview(msg: Message) -> str:
  22. """会话列表「最后一条」预览文案:用户通知类型展示 title,便于列表识别。"""
  23. ct = msg.content_type
  24. cv = ct.value if hasattr(ct, "value") else str(ct)
  25. if cv == ContentType.USER_NOTIFICATION.value:
  26. t = (msg.title or "").strip()
  27. return t if t else "申请通知"
  28. if cv == ContentType.TEXT.value:
  29. return msg.content or ""
  30. return f"[{cv}]"
  31. def _process_message_content(message: Message) -> MessageResponse:
  32. """处理消息内容,为文件类型生成预签名 URL"""
  33. # Pydantic v2 use model_validate
  34. response = MessageResponse.model_validate(message)
  35. # 填充应用名称
  36. if message.app_id and message.app:
  37. response.app_name = message.app.app_name
  38. if message.content_type in [ContentType.IMAGE, ContentType.VIDEO, ContentType.FILE]:
  39. # 如果内容是对象 Key (不以 http 开头),则生成预签名 URL
  40. # 如果是旧数据的完整 URL,则保持不变 (或视需求处理)
  41. if message.content and not message.content.startswith("http"):
  42. signed_url = minio_storage.get_presigned_url(message.content)
  43. if signed_url:
  44. response.content = signed_url
  45. return response
  46. @router.post("/", response_model=MessageResponse)
  47. async def create_message(
  48. *,
  49. db: Session = Depends(get_db),
  50. message_in: MessageCreate,
  51. current_subject: AuthSubject = Depends(get_current_user_or_app),
  52. background_tasks: BackgroundTasks
  53. ) -> Any:
  54. """
  55. 发送消息 (支持用户私信和应用通知)
  56. 权限:
  57. - 用户:只能发送 MESSAGE
  58. - 应用:可以发送 NOTIFICATION 或 MESSAGE
  59. """
  60. sender_id = None
  61. app_id_int = None # 数据库中的 Integer ID(用于存储到 Message 表)
  62. app_id_str = None # 字符串类型的 app_id(用于 SSO 跳转链接)
  63. # 1. 鉴权与身份识别
  64. if isinstance(current_subject, User):
  65. # 用户发送
  66. if message_in.type == MessageType.NOTIFICATION:
  67. raise HTTPException(status_code=403, detail="普通用户无权发送系统通知")
  68. sender_id = current_subject.id
  69. # 如果用户发送时提供了 app_id(字符串),需要查找对应的应用
  70. if message_in.app_id:
  71. app = db.query(Application).filter(Application.app_id == message_in.app_id).first()
  72. if not app:
  73. raise HTTPException(status_code=404, detail=f"应用未找到: {message_in.app_id}")
  74. app_id_int = app.id
  75. app_id_str = app.app_id
  76. elif isinstance(current_subject, Application):
  77. # 应用发送
  78. app_id_int = current_subject.id
  79. app_id_str = current_subject.app_id
  80. # 安全校验: 如果传入了 app_id(字符串),确保与签名身份一致
  81. if message_in.app_id:
  82. if message_in.app_id != current_subject.app_id:
  83. raise HTTPException(
  84. status_code=403,
  85. detail=f"传入的 app_id ({message_in.app_id}) 与认证应用不匹配"
  86. )
  87. # 使用当前认证应用的 app_id(字符串)
  88. message_in.app_id = app_id_str
  89. # 应用发信时可选:解析 sender_app_user_id → sender_id(不落库,只写 sender_id)
  90. sender_app_user_id = getattr(message_in, "sender_app_user_id", None)
  91. if sender_app_user_id:
  92. sender_mapping = db.query(AppUserMapping).filter(
  93. AppUserMapping.app_id == app_id_int,
  94. AppUserMapping.mapped_key == sender_app_user_id
  95. ).first()
  96. if not sender_mapping:
  97. raise HTTPException(
  98. status_code=404,
  99. detail=f"发起人映射未找到: App {message_in.app_id}, sender_app_user_id={sender_app_user_id}"
  100. )
  101. sender_id = sender_mapping.user_id
  102. # 2. 确定接收者 (Receiver Resolution)
  103. final_receiver_id = None
  104. is_broadcast = getattr(message_in, "is_broadcast", False)
  105. if is_broadcast:
  106. if message_in.type != MessageType.NOTIFICATION:
  107. raise HTTPException(status_code=403, detail="广播模式仅支持系统通知 (NOTIFICATION)")
  108. elif message_in.receiver_id:
  109. # 方式 A: 直接指定 User ID
  110. final_receiver_id = message_in.receiver_id
  111. user = db.query(User).filter(User.id == final_receiver_id).first()
  112. if not user:
  113. raise HTTPException(status_code=404, detail="接收用户未找到")
  114. elif message_in.app_user_id and message_in.app_id:
  115. # 方式 B: 通过 App User ID 查找映射
  116. # 注意:如果是用户发送,必须要提供 app_id 才能查映射
  117. # 如果是应用发送,message_in.app_id 已经是字符串类型
  118. # 如果 app_id_int 还没有设置(用户发送且提供了字符串 app_id),需要查找
  119. if app_id_int is None:
  120. app = db.query(Application).filter(Application.app_id == message_in.app_id).first()
  121. if not app:
  122. raise HTTPException(status_code=404, detail=f"应用未找到: {message_in.app_id}")
  123. app_id_int = app.id
  124. app_id_str = app.app_id
  125. mapping = db.query(AppUserMapping).filter(
  126. AppUserMapping.app_id == app_id_int, # 使用 Integer ID 查询映射表
  127. AppUserMapping.mapped_key == message_in.app_user_id
  128. ).first()
  129. if not mapping:
  130. raise HTTPException(
  131. status_code=404,
  132. detail=f"用户映射未找到: App {message_in.app_id}, User {message_in.app_user_id}"
  133. )
  134. final_receiver_id = mapping.user_id
  135. else:
  136. raise HTTPException(status_code=400, detail="必须指定 receiver_id 或 (app_id + app_user_id),或者设置 is_broadcast=True")
  137. # 3. 处理 SSO 跳转链接 (Link Generation)
  138. final_action_url = message_in.action_url
  139. if (
  140. message_in.auto_sso
  141. and app_id_str
  142. and message_in.target_url
  143. and (
  144. message_in.type == MessageType.NOTIFICATION
  145. or message_in.content_type == ContentType.USER_NOTIFICATION
  146. )
  147. ):
  148. # 生成 jump 接口 URL,用户点击时调用后端接口生成 callback URL
  149. # 格式: {PLATFORM_URL}/api/v1/simple/sso/jump?app_id={APP_ID}&redirect_to={TARGET_URL}
  150. base_url = settings.API_V1_STR # /api/v1
  151. encoded_target = quote(message_in.target_url)
  152. final_action_url = f"{base_url}/simple/sso/jump?app_id={app_id_str}&redirect_to={encoded_target}"
  153. # 处理内容 (如果是文件类型且传入的是 URL,尝试提取 Key)
  154. # 注意:前端可能会对 content 做 JSON.parse(如 USER_NOTIFICATION)。
  155. # 如果请求体传的是对象/dict,这里应当序列化为合法 JSON 字符串,而不是 str(dict)。
  156. if isinstance(message_in.content, str):
  157. content_val = message_in.content
  158. else:
  159. try:
  160. content_val = json.dumps(message_in.content, ensure_ascii=False, default=str)
  161. except Exception:
  162. content_val = str(message_in.content)
  163. if message_in.content_type in [ContentType.IMAGE, ContentType.VIDEO, ContentType.FILE]:
  164. # 简单判断: 如果包含 bucket name,可能是 URL
  165. if settings.MINIO_BUCKET_NAME in content_val and "http" in content_val:
  166. try:
  167. # 尝试从 URL 中提取 path
  168. parsed = urlparse(content_val)
  169. path = parsed.path.lstrip('/')
  170. # path 可能是 "bucket_name/object_key"
  171. if path.startswith(settings.MINIO_BUCKET_NAME + "/"):
  172. content_val = path[len(settings.MINIO_BUCKET_NAME)+1:]
  173. except:
  174. pass # 提取失败则保持原样
  175. # 4. 创建消息
  176. if is_broadcast:
  177. # 查找所有活跃用户 (is_deleted=0)
  178. active_users = db.query(User.id).filter(
  179. User.status == "ACTIVE",
  180. User.is_deleted == 0
  181. ).all()
  182. if not active_users:
  183. raise HTTPException(status_code=400, detail="没有可发送的活跃用户")
  184. messages_to_insert = []
  185. for u in active_users:
  186. messages_to_insert.append(
  187. Message(
  188. sender_id=sender_id,
  189. receiver_id=u.id,
  190. app_id=app_id_int,
  191. type=message_in.type,
  192. content_type=message_in.content_type,
  193. title=message_in.title,
  194. content=content_val,
  195. action_url=final_action_url,
  196. action_text=message_in.action_text
  197. )
  198. )
  199. db.add_all(messages_to_insert)
  200. db.commit()
  201. # 使用第一条消息进行签名作为接口的返回值
  202. db.refresh(messages_to_insert[0])
  203. processed_msg = _process_message_content(messages_to_insert[0])
  204. # 提取推送所需数据,避免在异步任务中触发延迟加载
  205. push_tasks = [{"id": msg.id, "receiver_id": msg.receiver_id} for msg in messages_to_insert]
  206. async def send_broadcast_ws():
  207. for t in push_tasks:
  208. push_payload = {
  209. "type": "NEW_MESSAGE",
  210. "data": {
  211. "id": t["id"],
  212. "type": processed_msg.type,
  213. "content_type": processed_msg.content_type,
  214. "title": processed_msg.title,
  215. "content": processed_msg.content,
  216. "action_url": processed_msg.action_url,
  217. "action_text": processed_msg.action_text,
  218. "sender_name": "系统通知" if not sender_id else "用户私信",
  219. "sender_id": sender_id,
  220. "created_at": str(processed_msg.created_at),
  221. "app_id": processed_msg.app_id,
  222. "app_name": processed_msg.app_name,
  223. }
  224. }
  225. await manager.send_personal_message(push_payload, t["receiver_id"])
  226. background_tasks.add_task(send_broadcast_ws)
  227. return processed_msg
  228. else:
  229. message = Message(
  230. sender_id=sender_id,
  231. receiver_id=final_receiver_id,
  232. app_id=app_id_int, # 使用 Integer ID 存储到数据库
  233. type=message_in.type,
  234. content_type=message_in.content_type,
  235. title=message_in.title,
  236. content=content_val,
  237. action_url=final_action_url,
  238. action_text=message_in.action_text
  239. )
  240. db.add(message)
  241. db.commit()
  242. db.refresh(message)
  243. # 5. 触发实时推送 (WebSocket)
  244. # 处理用于推送的消息内容 (签名)
  245. processed_msg = _process_message_content(message)
  246. push_payload = {
  247. "type": "NEW_MESSAGE",
  248. "data": {
  249. "id": processed_msg.id,
  250. "type": processed_msg.type,
  251. "content_type": processed_msg.content_type,
  252. "title": processed_msg.title,
  253. "content": processed_msg.content, # 使用签名后的 URL
  254. "action_url": processed_msg.action_url,
  255. "action_text": processed_msg.action_text,
  256. "sender_name": "系统通知" if not sender_id else "用户私信", # 简化处理
  257. "sender_id": sender_id, # Add sender_id for frontend to decide left/right
  258. "created_at": str(processed_msg.created_at),
  259. # 附加应用信息,便于前端按应用拆分系统通知会话
  260. "app_id": message.app_id,
  261. "app_name": message.app.app_name if message.app else None,
  262. }
  263. }
  264. # 使用后台任务发送 WS 消息,避免阻塞 HTTP 响应
  265. # 如果是发给自己,receiver_id == sender_id,ws 会收到一次
  266. background_tasks.add_task(manager.send_personal_message, push_payload, final_receiver_id)
  267. return processed_msg
  268. @router.get("/", response_model=List[MessageResponse])
  269. def read_messages(
  270. db: Session = Depends(get_db),
  271. skip: int = 0,
  272. limit: int = 100,
  273. unread_only: bool = Query(False),
  274. current_user: User = Depends(deps.get_current_active_user),
  275. ) -> Any:
  276. """
  277. 获取当前用户的消息列表 (所有历史记录)
  278. """
  279. query = db.query(Message).options(joinedload(Message.app)).filter(Message.receiver_id == current_user.id)
  280. if unread_only:
  281. query = query.filter(Message.is_read == False)
  282. messages = query.order_by(Message.created_at.desc()).offset(skip).limit(limit).all()
  283. # 处理文件 URL 签名
  284. return [_process_message_content(msg) for msg in messages]
  285. @router.get("/conversations", response_model=List[ConversationResponse])
  286. def get_conversations(
  287. db: Session = Depends(get_db),
  288. current_user: User = Depends(deps.get_current_active_user)
  289. ) -> Any:
  290. """
  291. 获取当前用户的会话列表 (聚合)
  292. - 私信:按用户聚合
  293. - 系统通知:按应用(app)拆分成多个会话,类似多个“系统私信”
  294. """
  295. # 查找所有与我相关的消息,并预加载 app 信息,便于显示应用名
  296. messages = (
  297. db.query(Message)
  298. .options(joinedload(Message.app))
  299. .filter(
  300. or_(
  301. Message.sender_id == current_user.id,
  302. Message.receiver_id == current_user.id,
  303. )
  304. )
  305. .order_by(Message.created_at.desc())
  306. .limit(1000)
  307. .all()
  308. )
  309. conversations_map: dict[int, dict] = {}
  310. for msg in messages:
  311. # 确定对话方 (Counterpart)
  312. other_id = None
  313. other_user = None
  314. # 系统通知:按 app 拆分。约定:同一个 app 的系统会话使用负数 id,避免和真实用户冲突
  315. if msg.type == MessageType.NOTIFICATION:
  316. if msg.app_id:
  317. other_id = -int(msg.app_id)
  318. else:
  319. # 兼容历史数据:没有 app_id 的系统通知归为一个“系统通知”会话
  320. other_id = 0
  321. elif msg.sender_id == current_user.id and msg.receiver_id == current_user.id:
  322. # 文件传输助手
  323. other_id = current_user.id
  324. other_user = current_user
  325. elif msg.sender_id == current_user.id:
  326. other_id = msg.receiver_id
  327. other_user = msg.receiver
  328. else:
  329. other_id = msg.sender_id
  330. other_user = msg.sender
  331. # 如果是私信但没找到用户,跳过
  332. if other_id not in (0, None) and not other_user and msg.type != MessageType.NOTIFICATION:
  333. continue
  334. # 如果这个对话方还没处理过
  335. if other_id not in conversations_map:
  336. if msg.type == MessageType.NOTIFICATION:
  337. # 系统会话
  338. if msg.app_id and msg.app:
  339. username = msg.app.app_id or f"APP-{msg.app_id}"
  340. full_name = msg.app.app_name or username
  341. is_system = True
  342. app_id = msg.app_id
  343. app_name = msg.app.app_name
  344. else:
  345. # 老的统一系统通知
  346. username = "System"
  347. full_name = "系统通知"
  348. is_system = True
  349. app_id = None
  350. app_name = None
  351. else:
  352. # 普通用户会话
  353. username = other_user.mobile # User has mobile, not username
  354. full_name = other_user.name or other_user.english_name or other_user.mobile
  355. is_system = False
  356. app_id = None
  357. app_name = None
  358. conversations_map[other_id] = {
  359. "user_id": other_id,
  360. "username": username,
  361. "full_name": full_name,
  362. "unread_count": 0,
  363. "last_message": _conversation_last_preview(msg),
  364. "last_message_type": msg.content_type,
  365. "updated_at": msg.created_at,
  366. "is_system": is_system,
  367. "app_id": app_id,
  368. "app_name": app_name,
  369. }
  370. # 累加未读数 (只计算接收方是自己的未读消息)
  371. # 注意: 这里的 is_read 是针对接收者的状态
  372. # 即使是自己发送的消息,msg.receiver_id 也不会是自己(除非发给自己)
  373. # 所以这里的判断逻辑是: 如果我是接收者,且未读,则计入未读数
  374. if not msg.is_read and msg.receiver_id == current_user.id:
  375. conversations_map[other_id]["unread_count"] += 1
  376. return list(conversations_map.values())
  377. @router.get("/history/{other_user_id}", response_model=List[MessageResponse])
  378. def get_chat_history(
  379. other_user_id: int,
  380. skip: int = 0,
  381. limit: int = 50,
  382. db: Session = Depends(get_db),
  383. current_user: User = Depends(deps.get_current_active_user)
  384. ) -> Any:
  385. """
  386. 获取与特定用户/系统会话的聊天记录
  387. - other_user_id > 0: 普通用户私信
  388. - other_user_id == 0: 兼容历史的“所有系统通知”会话
  389. - other_user_id < 0: 按 app 拆分的系统通知会话(-app_id)
  390. """
  391. if other_user_id == 0:
  392. # 所有系统通知(兼容旧实现)
  393. query = db.query(Message).options(joinedload(Message.app)).filter(
  394. Message.receiver_id == current_user.id,
  395. Message.type == MessageType.NOTIFICATION,
  396. ).order_by(Message.created_at.desc())
  397. elif other_user_id < 0:
  398. # 单个应用的系统通知会话
  399. app_id = -other_user_id
  400. query = db.query(Message).options(joinedload(Message.app)).filter(
  401. Message.receiver_id == current_user.id,
  402. Message.type == MessageType.NOTIFICATION,
  403. Message.app_id == app_id,
  404. ).order_by(Message.created_at.desc())
  405. else:
  406. # 用户私信
  407. query = db.query(Message).options(joinedload(Message.app)).filter(
  408. or_(
  409. and_(Message.sender_id == current_user.id, Message.receiver_id == other_user_id),
  410. and_(Message.sender_id == other_user_id, Message.receiver_id == current_user.id),
  411. )
  412. ).order_by(Message.created_at.desc()) # 最新在前
  413. messages = query.offset(skip).limit(limit).all()
  414. return [_process_message_content(msg) for msg in messages]
  415. @router.put("/history/{other_user_id}/read-all", response_model=dict)
  416. def mark_conversation_read_all(
  417. other_user_id: int,
  418. db: Session = Depends(get_db),
  419. current_user: User = Depends(deps.get_current_active_user),
  420. ) -> Any:
  421. """
  422. 将某一「会话范围」内、当前用户作为接收方的未读消息全部标为已读(与 GET /history/{other_user_id} 范围一致)。
  423. - other_user_id > 0: 来自该用户发给我的消息(含发给自己的会话)
  424. - other_user_id == 0: 所有系统通知(与历史接口兼容)
  425. - other_user_id < 0: 某一应用下的系统通知(-applications.id,与会话列表中系统会话 id 约定一致)
  426. """
  427. now = datetime.now()
  428. if other_user_id == 0:
  429. q = db.query(Message).filter(
  430. Message.receiver_id == current_user.id,
  431. Message.type == MessageType.NOTIFICATION,
  432. Message.is_read == False,
  433. )
  434. elif other_user_id < 0:
  435. app_id = -other_user_id
  436. q = db.query(Message).filter(
  437. Message.receiver_id == current_user.id,
  438. Message.type == MessageType.NOTIFICATION,
  439. Message.app_id == app_id,
  440. Message.is_read == False,
  441. )
  442. else:
  443. q = db.query(Message).filter(
  444. Message.receiver_id == current_user.id,
  445. Message.sender_id == other_user_id,
  446. Message.is_read == False,
  447. )
  448. result = q.update(
  449. {"is_read": True, "read_at": now},
  450. synchronize_session=False,
  451. )
  452. db.commit()
  453. return {"updated_count": result}
  454. @router.get("/unread-count", response_model=int)
  455. def get_unread_count(
  456. db: Session = Depends(get_db),
  457. current_user: User = Depends(deps.get_current_active_user),
  458. ) -> Any:
  459. count = db.query(Message).filter(
  460. Message.receiver_id == current_user.id,
  461. Message.is_read == False
  462. ).count()
  463. return count
  464. @router.put("/{message_id}/read", response_model=MessageResponse, deprecated=True)
  465. def mark_as_read(
  466. message_id: int,
  467. db: Session = Depends(get_db),
  468. current_user: User = Depends(deps.get_current_active_user),
  469. ) -> Any:
  470. message = db.query(Message).filter(
  471. Message.id == message_id,
  472. Message.receiver_id == current_user.id
  473. ).first()
  474. if not message:
  475. raise HTTPException(status_code=404, detail="Message not found")
  476. if not message.is_read:
  477. message.is_read = True
  478. message.read_at = datetime.now()
  479. db.add(message)
  480. db.commit()
  481. db.refresh(message)
  482. return _process_message_content(message)
  483. @router.put("/read-all", response_model=dict)
  484. def mark_all_read(
  485. db: Session = Depends(get_db),
  486. current_user: User = Depends(deps.get_current_active_user),
  487. ) -> Any:
  488. now = datetime.now()
  489. result = db.query(Message).filter(
  490. Message.receiver_id == current_user.id,
  491. Message.is_read == False
  492. ).update(
  493. {
  494. "is_read": True,
  495. "read_at": now
  496. },
  497. synchronize_session=False
  498. )
  499. db.commit()
  500. return {"updated_count": result}
  501. @router.delete("/{message_id}", response_model=MessageResponse)
  502. def delete_message(
  503. message_id: int,
  504. db: Session = Depends(get_db),
  505. current_user: User = Depends(deps.get_current_active_user),
  506. ) -> Any:
  507. message = db.query(Message).filter(
  508. Message.id == message_id,
  509. Message.receiver_id == current_user.id
  510. ).first()
  511. if not message:
  512. raise HTTPException(status_code=404, detail="Message not found")
  513. # 先处理返回数据,避免删除后无法访问
  514. processed_msg = _process_message_content(message)
  515. db.delete(message)
  516. db.commit()
  517. return processed_msg
  518. @router.get("/{message_id}/callback-url", response_model=dict)
  519. def get_message_callback_url(
  520. message_id: int,
  521. db: Session = Depends(get_db),
  522. current_user: User = Depends(deps.get_current_active_user),
  523. ) -> Any:
  524. """
  525. 获取消息的 callback URL(用于通知按钮点击)
  526. 内部执行 jump 接口的逻辑,实时生成 ticket 和 callback URL
  527. """
  528. # 1. 获取消息
  529. message = db.query(Message).filter(Message.id == message_id).first()
  530. if not message:
  531. raise HTTPException(status_code=404, detail="消息未找到")
  532. # 2. 验证权限:只有接收者可以获取
  533. if message.receiver_id != current_user.id:
  534. raise HTTPException(status_code=403, detail="无权访问此消息")
  535. # 3. 检查是否有 action_url(jump 接口 URL)
  536. if not message.action_url:
  537. raise HTTPException(status_code=400, detail="此消息没有配置跳转链接")
  538. # 4. 解析 action_url,提取 app_id 和 redirect_to
  539. # action_url 格式: /api/v1/simple/sso/jump?app_id=xxx&redirect_to=xxx
  540. parsed = urlparse(message.action_url)
  541. if not parsed.path.endswith("/sso/jump"):
  542. raise HTTPException(status_code=400, detail="无效的跳转链接格式")
  543. query_params = parse_qs(parsed.query)
  544. app_id = query_params.get("app_id", [None])[0]
  545. redirect_to = query_params.get("redirect_to", [None])[0]
  546. if not app_id or not redirect_to:
  547. raise HTTPException(status_code=400, detail="跳转链接参数不完整")
  548. # 5. 执行 jump 接口的逻辑(但不返回 RedirectResponse,而是返回 JSON)
  549. app = db.query(Application).filter(Application.app_id == app_id).first()
  550. if not app:
  551. raise HTTPException(status_code=404, detail="应用未找到")
  552. # 6. 生成 Ticket(使用当前登录用户)
  553. ticket = TicketService.generate_ticket(current_user.id, app_id)
  554. # 7. 获取应用回调地址
  555. redirect_base = ""
  556. if app.redirect_uris:
  557. try:
  558. uris = json.loads(app.redirect_uris)
  559. if isinstance(uris, list) and len(uris) > 0:
  560. redirect_base = uris[0]
  561. elif isinstance(uris, str):
  562. redirect_base = uris
  563. except (json.JSONDecodeError, TypeError):
  564. redirect_base = app.redirect_uris.strip()
  565. if not redirect_base:
  566. raise HTTPException(status_code=400, detail="应用未配置回调地址")
  567. # 8. 构造最终 callback URL
  568. parsed_uri = urlparse(redirect_base)
  569. callback_query_params = parse_qs(parsed_uri.query)
  570. callback_query_params['ticket'] = [ticket]
  571. callback_query_params['next'] = [redirect_to]
  572. new_query = urlencode(callback_query_params, doseq=True)
  573. callback_url = urlunparse((
  574. parsed_uri.scheme,
  575. parsed_uri.netloc,
  576. parsed_uri.path,
  577. parsed_uri.params,
  578. new_query,
  579. parsed_uri.fragment
  580. ))
  581. return {"callback_url": callback_url}