from typing import Any, List, Optional from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks from sqlalchemy.orm import Session, joinedload from sqlalchemy import or_, and_, desc from app.core.database import get_db from app.api.v1 import deps from app.api.v1.deps import AuthSubject, get_current_user_or_app from app.models.message import Message, MessageType from app.models.user import User from app.models.application import Application from app.models.mapping import AppUserMapping from app.schemas.message import MessageCreate, MessageResponse, MessageUpdate, ContentType, ConversationResponse from app.core.websocket_manager import manager from app.core.config import settings from app.core.minio import minio_storage from app.services.ticket_service import TicketService from datetime import datetime from urllib.parse import quote, urlparse, parse_qs, urlencode, urlunparse import json router = APIRouter() def _conversation_last_preview(msg: Message) -> str: """会话列表「最后一条」预览文案:用户通知类型展示 title,便于列表识别。""" ct = msg.content_type cv = ct.value if hasattr(ct, "value") else str(ct) if cv == ContentType.USER_NOTIFICATION.value: t = (msg.title or "").strip() return t if t else "申请通知" if cv == ContentType.TEXT.value: return msg.content or "" return f"[{cv}]" def _process_message_content(message: Message) -> MessageResponse: """处理消息内容,为文件类型生成预签名 URL""" # Pydantic v2 use model_validate response = MessageResponse.model_validate(message) # 填充应用名称 if message.app_id and message.app: response.app_name = message.app.app_name if message.content_type in [ContentType.IMAGE, ContentType.VIDEO, ContentType.FILE]: # 如果内容是对象 Key (不以 http 开头),则生成预签名 URL # 如果是旧数据的完整 URL,则保持不变 (或视需求处理) if message.content and not message.content.startswith("http"): signed_url = minio_storage.get_presigned_url(message.content) if signed_url: response.content = signed_url return response @router.post("/", response_model=MessageResponse) async def create_message( *, db: Session = Depends(get_db), message_in: MessageCreate, current_subject: AuthSubject = Depends(get_current_user_or_app), background_tasks: BackgroundTasks ) -> Any: """ 发送消息 (支持用户私信和应用通知) 权限: - 用户:只能发送 MESSAGE - 应用:可以发送 NOTIFICATION 或 MESSAGE """ sender_id = None app_id_int = None # 数据库中的 Integer ID(用于存储到 Message 表) app_id_str = None # 字符串类型的 app_id(用于 SSO 跳转链接) # 1. 鉴权与身份识别 if isinstance(current_subject, User): # 用户发送 if message_in.type == MessageType.NOTIFICATION: raise HTTPException(status_code=403, detail="普通用户无权发送系统通知") sender_id = current_subject.id # 如果用户发送时提供了 app_id(字符串),需要查找对应的应用 if message_in.app_id: app = db.query(Application).filter(Application.app_id == message_in.app_id).first() if not app: raise HTTPException(status_code=404, detail=f"应用未找到: {message_in.app_id}") app_id_int = app.id app_id_str = app.app_id elif isinstance(current_subject, Application): # 应用发送 app_id_int = current_subject.id app_id_str = current_subject.app_id # 安全校验: 如果传入了 app_id(字符串),确保与签名身份一致 if message_in.app_id: if message_in.app_id != current_subject.app_id: raise HTTPException( status_code=403, detail=f"传入的 app_id ({message_in.app_id}) 与认证应用不匹配" ) # 使用当前认证应用的 app_id(字符串) message_in.app_id = app_id_str # 应用发信时可选:解析 sender_app_user_id → sender_id(不落库,只写 sender_id) sender_app_user_id = getattr(message_in, "sender_app_user_id", None) if sender_app_user_id: sender_mapping = db.query(AppUserMapping).filter( AppUserMapping.app_id == app_id_int, AppUserMapping.mapped_key == sender_app_user_id ).first() if not sender_mapping: raise HTTPException( status_code=404, detail=f"发起人映射未找到: App {message_in.app_id}, sender_app_user_id={sender_app_user_id}" ) sender_id = sender_mapping.user_id # 2. 确定接收者 (Receiver Resolution) final_receiver_id = None is_broadcast = getattr(message_in, "is_broadcast", False) if is_broadcast: if message_in.type != MessageType.NOTIFICATION: raise HTTPException(status_code=403, detail="广播模式仅支持系统通知 (NOTIFICATION)") elif message_in.receiver_id: # 方式 A: 直接指定 User ID final_receiver_id = message_in.receiver_id user = db.query(User).filter(User.id == final_receiver_id).first() if not user: raise HTTPException(status_code=404, detail="接收用户未找到") elif message_in.app_user_id and message_in.app_id: # 方式 B: 通过 App User ID 查找映射 # 注意:如果是用户发送,必须要提供 app_id 才能查映射 # 如果是应用发送,message_in.app_id 已经是字符串类型 # 如果 app_id_int 还没有设置(用户发送且提供了字符串 app_id),需要查找 if app_id_int is None: app = db.query(Application).filter(Application.app_id == message_in.app_id).first() if not app: raise HTTPException(status_code=404, detail=f"应用未找到: {message_in.app_id}") app_id_int = app.id app_id_str = app.app_id mapping = db.query(AppUserMapping).filter( AppUserMapping.app_id == app_id_int, # 使用 Integer ID 查询映射表 AppUserMapping.mapped_key == message_in.app_user_id ).first() if not mapping: raise HTTPException( status_code=404, detail=f"用户映射未找到: App {message_in.app_id}, User {message_in.app_user_id}" ) final_receiver_id = mapping.user_id else: raise HTTPException(status_code=400, detail="必须指定 receiver_id 或 (app_id + app_user_id),或者设置 is_broadcast=True") # 3. 处理 SSO 跳转链接 (Link Generation) final_action_url = message_in.action_url if ( message_in.auto_sso and app_id_str and message_in.target_url and ( message_in.type == MessageType.NOTIFICATION or message_in.content_type == ContentType.USER_NOTIFICATION ) ): # 生成 jump 接口 URL,用户点击时调用后端接口生成 callback URL # 格式: {PLATFORM_URL}/api/v1/simple/sso/jump?app_id={APP_ID}&redirect_to={TARGET_URL} base_url = settings.API_V1_STR # /api/v1 encoded_target = quote(message_in.target_url) final_action_url = f"{base_url}/simple/sso/jump?app_id={app_id_str}&redirect_to={encoded_target}" # 处理内容 (如果是文件类型且传入的是 URL,尝试提取 Key) # 注意:前端可能会对 content 做 JSON.parse(如 USER_NOTIFICATION)。 # 如果请求体传的是对象/dict,这里应当序列化为合法 JSON 字符串,而不是 str(dict)。 if isinstance(message_in.content, str): content_val = message_in.content else: try: content_val = json.dumps(message_in.content, ensure_ascii=False, default=str) except Exception: content_val = str(message_in.content) if message_in.content_type in [ContentType.IMAGE, ContentType.VIDEO, ContentType.FILE]: # 简单判断: 如果包含 bucket name,可能是 URL if settings.MINIO_BUCKET_NAME in content_val and "http" in content_val: try: # 尝试从 URL 中提取 path parsed = urlparse(content_val) path = parsed.path.lstrip('/') # path 可能是 "bucket_name/object_key" if path.startswith(settings.MINIO_BUCKET_NAME + "/"): content_val = path[len(settings.MINIO_BUCKET_NAME)+1:] except: pass # 提取失败则保持原样 # 4. 创建消息 if is_broadcast: # 查找所有活跃用户 (is_deleted=0) active_users = db.query(User.id).filter( User.status == "ACTIVE", User.is_deleted == 0 ).all() if not active_users: raise HTTPException(status_code=400, detail="没有可发送的活跃用户") messages_to_insert = [] for u in active_users: messages_to_insert.append( Message( sender_id=sender_id, receiver_id=u.id, app_id=app_id_int, type=message_in.type, content_type=message_in.content_type, title=message_in.title, content=content_val, action_url=final_action_url, action_text=message_in.action_text ) ) db.add_all(messages_to_insert) db.commit() # 使用第一条消息进行签名作为接口的返回值 db.refresh(messages_to_insert[0]) processed_msg = _process_message_content(messages_to_insert[0]) # 提取推送所需数据,避免在异步任务中触发延迟加载 push_tasks = [{"id": msg.id, "receiver_id": msg.receiver_id} for msg in messages_to_insert] async def send_broadcast_ws(): for t in push_tasks: push_payload = { "type": "NEW_MESSAGE", "data": { "id": t["id"], "type": processed_msg.type, "content_type": processed_msg.content_type, "title": processed_msg.title, "content": processed_msg.content, "action_url": processed_msg.action_url, "action_text": processed_msg.action_text, "sender_name": "系统通知" if not sender_id else "用户私信", "sender_id": sender_id, "created_at": str(processed_msg.created_at), "app_id": processed_msg.app_id, "app_name": processed_msg.app_name, } } await manager.send_personal_message(push_payload, t["receiver_id"]) background_tasks.add_task(send_broadcast_ws) return processed_msg else: message = Message( sender_id=sender_id, receiver_id=final_receiver_id, app_id=app_id_int, # 使用 Integer ID 存储到数据库 type=message_in.type, content_type=message_in.content_type, title=message_in.title, content=content_val, action_url=final_action_url, action_text=message_in.action_text ) db.add(message) db.commit() db.refresh(message) # 5. 触发实时推送 (WebSocket) # 处理用于推送的消息内容 (签名) processed_msg = _process_message_content(message) push_payload = { "type": "NEW_MESSAGE", "data": { "id": processed_msg.id, "type": processed_msg.type, "content_type": processed_msg.content_type, "title": processed_msg.title, "content": processed_msg.content, # 使用签名后的 URL "action_url": processed_msg.action_url, "action_text": processed_msg.action_text, "sender_name": "系统通知" if not sender_id else "用户私信", # 简化处理 "sender_id": sender_id, # Add sender_id for frontend to decide left/right "created_at": str(processed_msg.created_at), # 附加应用信息,便于前端按应用拆分系统通知会话 "app_id": message.app_id, "app_name": message.app.app_name if message.app else None, } } # 使用后台任务发送 WS 消息,避免阻塞 HTTP 响应 # 如果是发给自己,receiver_id == sender_id,ws 会收到一次 background_tasks.add_task(manager.send_personal_message, push_payload, final_receiver_id) return processed_msg @router.get("/", response_model=List[MessageResponse]) def read_messages( db: Session = Depends(get_db), skip: int = 0, limit: int = 100, unread_only: bool = Query(False), current_user: User = Depends(deps.get_current_active_user), ) -> Any: """ 获取当前用户的消息列表 (所有历史记录) """ query = db.query(Message).options(joinedload(Message.app)).filter(Message.receiver_id == current_user.id) if unread_only: query = query.filter(Message.is_read == False) messages = query.order_by(Message.created_at.desc()).offset(skip).limit(limit).all() # 处理文件 URL 签名 return [_process_message_content(msg) for msg in messages] @router.get("/conversations", response_model=List[ConversationResponse]) def get_conversations( db: Session = Depends(get_db), current_user: User = Depends(deps.get_current_active_user) ) -> Any: """ 获取当前用户的会话列表 (聚合) - 私信:按用户聚合 - 系统通知:按应用(app)拆分成多个会话,类似多个“系统私信” """ # 查找所有与我相关的消息,并预加载 app 信息,便于显示应用名 messages = ( db.query(Message) .options(joinedload(Message.app)) .filter( or_( Message.sender_id == current_user.id, Message.receiver_id == current_user.id, ) ) .order_by(Message.created_at.desc()) .limit(1000) .all() ) conversations_map: dict[int, dict] = {} for msg in messages: # 确定对话方 (Counterpart) other_id = None other_user = None # 系统通知:按 app 拆分。约定:同一个 app 的系统会话使用负数 id,避免和真实用户冲突 if msg.type == MessageType.NOTIFICATION: if msg.app_id: other_id = -int(msg.app_id) else: # 兼容历史数据:没有 app_id 的系统通知归为一个“系统通知”会话 other_id = 0 elif msg.sender_id == current_user.id and msg.receiver_id == current_user.id: # 文件传输助手 other_id = current_user.id other_user = current_user elif msg.sender_id == current_user.id: other_id = msg.receiver_id other_user = msg.receiver else: other_id = msg.sender_id other_user = msg.sender # 如果是私信但没找到用户,跳过 if other_id not in (0, None) and not other_user and msg.type != MessageType.NOTIFICATION: continue # 如果这个对话方还没处理过 if other_id not in conversations_map: if msg.type == MessageType.NOTIFICATION: # 系统会话 if msg.app_id and msg.app: username = msg.app.app_id or f"APP-{msg.app_id}" full_name = msg.app.app_name or username is_system = True app_id = msg.app_id app_name = msg.app.app_name else: # 老的统一系统通知 username = "System" full_name = "系统通知" is_system = True app_id = None app_name = None else: # 普通用户会话 username = other_user.mobile # User has mobile, not username full_name = other_user.name or other_user.english_name or other_user.mobile is_system = False app_id = None app_name = None conversations_map[other_id] = { "user_id": other_id, "username": username, "full_name": full_name, "unread_count": 0, "last_message": _conversation_last_preview(msg), "last_message_type": msg.content_type, "updated_at": msg.created_at, "is_system": is_system, "app_id": app_id, "app_name": app_name, } # 累加未读数 (只计算接收方是自己的未读消息) # 注意: 这里的 is_read 是针对接收者的状态 # 即使是自己发送的消息,msg.receiver_id 也不会是自己(除非发给自己) # 所以这里的判断逻辑是: 如果我是接收者,且未读,则计入未读数 if not msg.is_read and msg.receiver_id == current_user.id: conversations_map[other_id]["unread_count"] += 1 return list(conversations_map.values()) @router.get("/history/{other_user_id}", response_model=List[MessageResponse]) def get_chat_history( other_user_id: int, skip: int = 0, limit: int = 50, db: Session = Depends(get_db), current_user: User = Depends(deps.get_current_active_user) ) -> Any: """ 获取与特定用户/系统会话的聊天记录 - other_user_id > 0: 普通用户私信 - other_user_id == 0: 兼容历史的“所有系统通知”会话 - other_user_id < 0: 按 app 拆分的系统通知会话(-app_id) """ if other_user_id == 0: # 所有系统通知(兼容旧实现) query = db.query(Message).options(joinedload(Message.app)).filter( Message.receiver_id == current_user.id, Message.type == MessageType.NOTIFICATION, ).order_by(Message.created_at.desc()) elif other_user_id < 0: # 单个应用的系统通知会话 app_id = -other_user_id query = db.query(Message).options(joinedload(Message.app)).filter( Message.receiver_id == current_user.id, Message.type == MessageType.NOTIFICATION, Message.app_id == app_id, ).order_by(Message.created_at.desc()) else: # 用户私信 query = db.query(Message).options(joinedload(Message.app)).filter( or_( and_(Message.sender_id == current_user.id, Message.receiver_id == other_user_id), and_(Message.sender_id == other_user_id, Message.receiver_id == current_user.id), ) ).order_by(Message.created_at.desc()) # 最新在前 messages = query.offset(skip).limit(limit).all() return [_process_message_content(msg) for msg in messages] @router.put("/history/{other_user_id}/read-all", response_model=dict) def mark_conversation_read_all( other_user_id: int, db: Session = Depends(get_db), current_user: User = Depends(deps.get_current_active_user), ) -> Any: """ 将某一「会话范围」内、当前用户作为接收方的未读消息全部标为已读(与 GET /history/{other_user_id} 范围一致)。 - other_user_id > 0: 来自该用户发给我的消息(含发给自己的会话) - other_user_id == 0: 所有系统通知(与历史接口兼容) - other_user_id < 0: 某一应用下的系统通知(-applications.id,与会话列表中系统会话 id 约定一致) """ now = datetime.now() if other_user_id == 0: q = db.query(Message).filter( Message.receiver_id == current_user.id, Message.type == MessageType.NOTIFICATION, Message.is_read == False, ) elif other_user_id < 0: app_id = -other_user_id q = db.query(Message).filter( Message.receiver_id == current_user.id, Message.type == MessageType.NOTIFICATION, Message.app_id == app_id, Message.is_read == False, ) else: q = db.query(Message).filter( Message.receiver_id == current_user.id, Message.sender_id == other_user_id, Message.is_read == False, ) result = q.update( {"is_read": True, "read_at": now}, synchronize_session=False, ) db.commit() return {"updated_count": result} @router.get("/unread-count", response_model=int) def get_unread_count( db: Session = Depends(get_db), current_user: User = Depends(deps.get_current_active_user), ) -> Any: count = db.query(Message).filter( Message.receiver_id == current_user.id, Message.is_read == False ).count() return count @router.put("/{message_id}/read", response_model=MessageResponse, deprecated=True) def mark_as_read( message_id: int, db: Session = Depends(get_db), current_user: User = Depends(deps.get_current_active_user), ) -> Any: message = db.query(Message).filter( Message.id == message_id, Message.receiver_id == current_user.id ).first() if not message: raise HTTPException(status_code=404, detail="Message not found") if not message.is_read: message.is_read = True message.read_at = datetime.now() db.add(message) db.commit() db.refresh(message) return _process_message_content(message) @router.put("/read-all", response_model=dict) def mark_all_read( db: Session = Depends(get_db), current_user: User = Depends(deps.get_current_active_user), ) -> Any: now = datetime.now() result = db.query(Message).filter( Message.receiver_id == current_user.id, Message.is_read == False ).update( { "is_read": True, "read_at": now }, synchronize_session=False ) db.commit() return {"updated_count": result} @router.delete("/{message_id}", response_model=MessageResponse) def delete_message( message_id: int, db: Session = Depends(get_db), current_user: User = Depends(deps.get_current_active_user), ) -> Any: message = db.query(Message).filter( Message.id == message_id, Message.receiver_id == current_user.id ).first() if not message: raise HTTPException(status_code=404, detail="Message not found") # 先处理返回数据,避免删除后无法访问 processed_msg = _process_message_content(message) db.delete(message) db.commit() return processed_msg @router.get("/{message_id}/callback-url", response_model=dict) def get_message_callback_url( message_id: int, db: Session = Depends(get_db), current_user: User = Depends(deps.get_current_active_user), ) -> Any: """ 获取消息的 callback URL(用于通知按钮点击) 内部执行 jump 接口的逻辑,实时生成 ticket 和 callback URL """ # 1. 获取消息 message = db.query(Message).filter(Message.id == message_id).first() if not message: raise HTTPException(status_code=404, detail="消息未找到") # 2. 验证权限:只有接收者可以获取 if message.receiver_id != current_user.id: raise HTTPException(status_code=403, detail="无权访问此消息") # 3. 检查是否有 action_url(jump 接口 URL) if not message.action_url: raise HTTPException(status_code=400, detail="此消息没有配置跳转链接") # 4. 解析 action_url,提取 app_id 和 redirect_to # action_url 格式: /api/v1/simple/sso/jump?app_id=xxx&redirect_to=xxx parsed = urlparse(message.action_url) if not parsed.path.endswith("/sso/jump"): raise HTTPException(status_code=400, detail="无效的跳转链接格式") query_params = parse_qs(parsed.query) app_id = query_params.get("app_id", [None])[0] redirect_to = query_params.get("redirect_to", [None])[0] if not app_id or not redirect_to: raise HTTPException(status_code=400, detail="跳转链接参数不完整") # 5. 执行 jump 接口的逻辑(但不返回 RedirectResponse,而是返回 JSON) app = db.query(Application).filter(Application.app_id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") # 6. 生成 Ticket(使用当前登录用户) ticket = TicketService.generate_ticket(current_user.id, app_id) # 7. 获取应用回调地址 redirect_base = "" if app.redirect_uris: try: uris = json.loads(app.redirect_uris) if isinstance(uris, list) and len(uris) > 0: redirect_base = uris[0] elif isinstance(uris, str): redirect_base = uris except (json.JSONDecodeError, TypeError): redirect_base = app.redirect_uris.strip() if not redirect_base: raise HTTPException(status_code=400, detail="应用未配置回调地址") # 8. 构造最终 callback URL parsed_uri = urlparse(redirect_base) callback_query_params = parse_qs(parsed_uri.query) callback_query_params['ticket'] = [ticket] callback_query_params['next'] = [redirect_to] new_query = urlencode(callback_query_params, doseq=True) callback_url = urlunparse(( parsed_uri.scheme, parsed_uri.netloc, parsed_uri.path, parsed_uri.params, new_query, parsed_uri.fragment )) return {"callback_url": callback_url}