| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363 |
- from typing import Any, List, Optional
- from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
- from sqlalchemy.orm import Session, joinedload
- from sqlalchemy import or_, and_, desc
- from app.core.database import get_db
- from app.api.v1 import deps
- from app.api.v1.deps import AuthSubject, get_current_user_or_app
- from app.models.message import Message, MessageType
- from app.models.user import User
- from app.models.application import Application
- from app.models.mapping import AppUserMapping
- from app.schemas.message import MessageCreate, MessageResponse, MessageUpdate, ContentType, ConversationResponse
- from app.core.websocket_manager import manager
- from app.core.config import settings
- from app.core.minio import minio_storage
- from datetime import datetime
- from urllib.parse import quote, urlparse
- router = APIRouter()
- def _process_message_content(message: Message) -> MessageResponse:
- """处理消息内容,为文件类型生成预签名 URL"""
- # Pydantic v2 use model_validate
- response = MessageResponse.model_validate(message)
-
- # 填充应用名称
- if message.app_id and message.app:
- response.app_name = message.app.app_name
-
- if message.content_type in [ContentType.IMAGE, ContentType.VIDEO, ContentType.FILE]:
- # 如果内容是对象 Key (不以 http 开头),则生成预签名 URL
- # 如果是旧数据的完整 URL,则保持不变 (或视需求处理)
- if message.content and not message.content.startswith("http"):
- signed_url = minio_storage.get_presigned_url(message.content)
- if signed_url:
- response.content = signed_url
-
- return response
- @router.post("/", response_model=MessageResponse)
- async def create_message(
- *,
- db: Session = Depends(get_db),
- message_in: MessageCreate,
- current_subject: AuthSubject = Depends(get_current_user_or_app),
- background_tasks: BackgroundTasks
- ) -> Any:
- """
- 发送消息 (支持用户私信和应用通知)
- 权限:
- - 用户:只能发送 MESSAGE
- - 应用:可以发送 NOTIFICATION 或 MESSAGE
- """
- sender_id = None
- app_id = None
- # 1. 鉴权与身份识别
- if isinstance(current_subject, User):
- # 用户发送
- if message_in.type == MessageType.NOTIFICATION:
- raise HTTPException(status_code=403, detail="普通用户无权发送系统通知")
- sender_id = current_subject.id
-
- elif isinstance(current_subject, Application):
- # 应用发送
- app_id = current_subject.id
- # 安全校验: 确保传入的 app_id (如果有) 与签名身份一致
- if message_in.app_id and message_in.app_id != current_subject.id:
- # 这里我们选择忽略传入的 app_id,强制使用当前认证的应用 ID
- pass
- message_in.app_id = app_id
- # 2. 确定接收者 (Receiver Resolution)
- final_receiver_id = None
- if message_in.receiver_id:
- # 方式 A: 直接指定 User ID
- final_receiver_id = message_in.receiver_id
- user = db.query(User).filter(User.id == final_receiver_id).first()
- if not user:
- raise HTTPException(status_code=404, detail="接收用户未找到")
-
- elif message_in.app_user_id and message_in.app_id:
- # 方式 B: 通过 App User ID 查找映射
- # 注意:如果是用户发送,必须要提供 app_id 才能查映射
- # 如果是应用发送,message_in.app_id 已经被赋值为 current_subject.id
-
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == message_in.app_id,
- AppUserMapping.mapped_key == message_in.app_user_id
- ).first()
-
- if not mapping:
- raise HTTPException(
- status_code=404,
- detail=f"用户映射未找到: App {message_in.app_id}, User {message_in.app_user_id}"
- )
- final_receiver_id = mapping.user_id
- else:
- raise HTTPException(status_code=400, detail="必须指定 receiver_id 或 (app_id + app_user_id)")
- # 3. 处理 SSO 跳转链接 (Link Generation)
- final_action_url = message_in.action_url
-
- if message_in.type == MessageType.NOTIFICATION and message_in.auto_sso and message_in.app_id and message_in.target_url:
- # 构造 SSO 中转链接
- # 格式: {PLATFORM_URL}/api/v1/auth/sso/jump?app_id={APP_ID}&redirect_to={TARGET_URL}
-
- # 假设 settings.SERVER_HOST 配置了当前服务地址,如果没有则使用相对路径或默认值
- # 这里为了演示,假设前端或API base url
- base_url = settings.API_V1_STR # /api/v1
-
- encoded_target = quote(message_in.target_url)
- final_action_url = f"{base_url}/auth/sso/jump?app_id={message_in.app_id}&redirect_to={encoded_target}"
- # 处理内容 (如果是文件类型且传入的是 URL,尝试提取 Key)
- content_val = message_in.content if isinstance(message_in.content, str) else str(message_in.content)
- if message_in.content_type in [ContentType.IMAGE, ContentType.VIDEO, ContentType.FILE]:
- # 简单判断: 如果包含 bucket name,可能是 URL
- if settings.MINIO_BUCKET_NAME in content_val and "http" in content_val:
- try:
- # 尝试从 URL 中提取 path
- parsed = urlparse(content_val)
- path = parsed.path.lstrip('/')
- # path 可能是 "bucket_name/object_key"
- if path.startswith(settings.MINIO_BUCKET_NAME + "/"):
- content_val = path[len(settings.MINIO_BUCKET_NAME)+1:]
- except:
- pass # 提取失败则保持原样
- # 4. 创建消息
- message = Message(
- sender_id=sender_id,
- receiver_id=final_receiver_id,
- app_id=message_in.app_id,
- type=message_in.type,
- content_type=message_in.content_type,
- title=message_in.title,
- content=content_val,
- action_url=final_action_url,
- action_text=message_in.action_text
- )
- db.add(message)
- db.commit()
- db.refresh(message)
-
- # 5. 触发实时推送 (WebSocket)
- # 处理用于推送的消息内容 (签名)
- processed_msg = _process_message_content(message)
-
- push_payload = {
- "type": "NEW_MESSAGE",
- "data": {
- "id": processed_msg.id,
- "type": processed_msg.type,
- "content_type": processed_msg.content_type,
- "title": processed_msg.title,
- "content": processed_msg.content, # 使用签名后的 URL
- "action_url": processed_msg.action_url,
- "action_text": processed_msg.action_text,
- "sender_name": "系统通知" if not sender_id else "用户私信", # 简化处理
- "sender_id": sender_id, # Add sender_id for frontend to decide left/right
- "created_at": str(processed_msg.created_at)
- }
- }
-
- # 使用后台任务发送 WS 消息,避免阻塞 HTTP 响应
- # 如果是发给自己,receiver_id == sender_id,ws 会收到一次
- background_tasks.add_task(manager.send_personal_message, push_payload, final_receiver_id)
- return processed_msg
- @router.get("/", response_model=List[MessageResponse])
- def read_messages(
- db: Session = Depends(get_db),
- skip: int = 0,
- limit: int = 100,
- unread_only: bool = False,
- current_user: User = Depends(deps.get_current_active_user),
- ) -> Any:
- """
- 获取当前用户的消息列表 (所有历史记录)
- """
- query = db.query(Message).options(joinedload(Message.app)).filter(Message.receiver_id == current_user.id)
- if unread_only:
- query = query.filter(Message.is_read == False)
-
- messages = query.order_by(Message.created_at.desc()).offset(skip).limit(limit).all()
-
- # 处理文件 URL 签名
- return [_process_message_content(msg) for msg in messages]
- @router.get("/conversations", response_model=List[ConversationResponse])
- def get_conversations(
- db: Session = Depends(get_db),
- current_user: User = Depends(deps.get_current_active_user)
- ) -> Any:
- """
- 获取当前用户的会话列表 (聚合)
- """
- # 查找所有与我相关的消息
- messages = db.query(Message).filter(
- or_(
- Message.sender_id == current_user.id,
- Message.receiver_id == current_user.id
- )
- ).order_by(Message.created_at.desc()).limit(1000).all()
- conversations_map = {}
-
- for msg in messages:
- # 确定对话方 (Counterpart)
- other_id = None
- other_user = None
- if msg.type == MessageType.NOTIFICATION:
- # 系统通知,归类为一个特殊的 ID 0
- other_id = 0
- # No user object needed for system
- elif msg.sender_id == current_user.id and msg.receiver_id == current_user.id:
- # 文件传输助手
- other_id = current_user.id
- other_user = current_user
- elif msg.sender_id == current_user.id:
- other_id = msg.receiver_id
- other_user = msg.receiver
- else:
- other_id = msg.sender_id
- other_user = msg.sender
-
- # 如果是私信但没找到用户,跳过
- if other_id != 0 and not other_user:
- continue
- # 如果这个对话方还没处理过
- if other_id not in conversations_map:
- if other_id == 0:
- username = "System"
- full_name = "系统通知"
- else:
- username = other_user.mobile # User has mobile, not username
- full_name = other_user.name or other_user.english_name or other_user.mobile
- conversations_map[other_id] = {
- "user_id": other_id,
- "username": username,
- "full_name": full_name,
- "unread_count": 0,
- "last_message": msg.content if msg.content_type == ContentType.TEXT else f"[{msg.content_type}]",
- "last_message_type": msg.content_type,
- "updated_at": msg.created_at
- }
-
- # 累加未读数 (只计算接收方是自己的未读消息)
- # 注意: 这里的 is_read 是针对接收者的状态
- # 即使是自己发送的消息,msg.receiver_id 也不会是自己(除非发给自己)
- # 所以这里的判断逻辑是: 如果我是接收者,且未读,则计入未读数
- if not msg.is_read and msg.receiver_id == current_user.id:
- conversations_map[other_id]["unread_count"] += 1
- return list(conversations_map.values())
- @router.get("/history/{other_user_id}", response_model=List[MessageResponse])
- def get_chat_history(
- other_user_id: int,
- skip: int = 0,
- limit: int = 50,
- db: Session = Depends(get_db),
- current_user: User = Depends(deps.get_current_active_user)
- ) -> Any:
- """
- 获取与特定用户的聊天记录
- """
- if other_user_id == 0:
- # System Notifications
- query = db.query(Message).options(joinedload(Message.app)).filter(
- Message.receiver_id == current_user.id,
- Message.type == MessageType.NOTIFICATION
- ).order_by(Message.created_at.desc())
- else:
- # User Chat
- query = db.query(Message).options(joinedload(Message.app)).filter(
- or_(
- and_(Message.sender_id == current_user.id, Message.receiver_id == other_user_id),
- and_(Message.sender_id == other_user_id, Message.receiver_id == current_user.id)
- )
- ).order_by(Message.created_at.desc()) # 最新在前
-
- messages = query.offset(skip).limit(limit).all()
-
- return [_process_message_content(msg) for msg in messages]
- @router.get("/unread-count", response_model=int)
- def get_unread_count(
- db: Session = Depends(get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ) -> Any:
- count = db.query(Message).filter(
- Message.receiver_id == current_user.id,
- Message.is_read == False
- ).count()
- return count
- @router.put("/{message_id}/read", response_model=MessageResponse)
- def mark_as_read(
- message_id: int,
- db: Session = Depends(get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ) -> Any:
- message = db.query(Message).filter(
- Message.id == message_id,
- Message.receiver_id == current_user.id
- ).first()
- if not message:
- raise HTTPException(status_code=404, detail="Message not found")
-
- if not message.is_read:
- message.is_read = True
- message.read_at = datetime.now()
- db.add(message)
- db.commit()
- db.refresh(message)
-
- return _process_message_content(message)
- @router.put("/read-all", response_model=dict)
- def mark_all_read(
- db: Session = Depends(get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ) -> Any:
- now = datetime.now()
- result = db.query(Message).filter(
- Message.receiver_id == current_user.id,
- Message.is_read == False
- ).update(
- {
- "is_read": True,
- "read_at": now
- },
- synchronize_session=False
- )
- db.commit()
- return {"updated_count": result}
- @router.delete("/{message_id}", response_model=MessageResponse)
- def delete_message(
- message_id: int,
- db: Session = Depends(get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ) -> Any:
- message = db.query(Message).filter(
- Message.id == message_id,
- Message.receiver_id == current_user.id
- ).first()
- if not message:
- raise HTTPException(status_code=404, detail="Message not found")
-
- # 先处理返回数据,避免删除后无法访问
- processed_msg = _process_message_content(message)
-
- db.delete(message)
- db.commit()
- return processed_msg
|