| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574 |
- from typing import Any, List, Optional
- from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
- from sqlalchemy.orm import Session, joinedload
- from sqlalchemy import or_, and_, desc
- from app.core.database import get_db
- from app.api.v1 import deps
- from app.api.v1.deps import AuthSubject, get_current_user_or_app
- from app.models.message import Message, MessageType
- from app.models.user import User
- from app.models.application import Application
- from app.models.mapping import AppUserMapping
- from app.schemas.message import MessageCreate, MessageResponse, MessageUpdate, ContentType, ConversationResponse
- from app.core.websocket_manager import manager
- from app.core.config import settings
- from app.core.minio import minio_storage
- from app.services.ticket_service import TicketService
- from datetime import datetime
- from urllib.parse import quote, urlparse, parse_qs, urlencode, urlunparse
- import json
- router = APIRouter()
- def _process_message_content(message: Message) -> MessageResponse:
- """处理消息内容,为文件类型生成预签名 URL"""
- # Pydantic v2 use model_validate
- response = MessageResponse.model_validate(message)
-
- # 填充应用名称
- if message.app_id and message.app:
- response.app_name = message.app.app_name
-
- if message.content_type in [ContentType.IMAGE, ContentType.VIDEO, ContentType.FILE]:
- # 如果内容是对象 Key (不以 http 开头),则生成预签名 URL
- # 如果是旧数据的完整 URL,则保持不变 (或视需求处理)
- if message.content and not message.content.startswith("http"):
- signed_url = minio_storage.get_presigned_url(message.content)
- if signed_url:
- response.content = signed_url
-
- return response
- @router.post("/", response_model=MessageResponse)
- async def create_message(
- *,
- db: Session = Depends(get_db),
- message_in: MessageCreate,
- current_subject: AuthSubject = Depends(get_current_user_or_app),
- background_tasks: BackgroundTasks
- ) -> Any:
- """
- 发送消息 (支持用户私信和应用通知)
- 权限:
- - 用户:只能发送 MESSAGE
- - 应用:可以发送 NOTIFICATION 或 MESSAGE
- """
- sender_id = None
- app_id_int = None # 数据库中的 Integer ID(用于存储到 Message 表)
- app_id_str = None # 字符串类型的 app_id(用于 SSO 跳转链接)
- # 1. 鉴权与身份识别
- if isinstance(current_subject, User):
- # 用户发送
- if message_in.type == MessageType.NOTIFICATION:
- raise HTTPException(status_code=403, detail="普通用户无权发送系统通知")
- sender_id = current_subject.id
-
- # 如果用户发送时提供了 app_id(字符串),需要查找对应的应用
- if message_in.app_id:
- app = db.query(Application).filter(Application.app_id == message_in.app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail=f"应用未找到: {message_in.app_id}")
- app_id_int = app.id
- app_id_str = app.app_id
-
- elif isinstance(current_subject, Application):
- # 应用发送
- app_id_int = current_subject.id
- app_id_str = current_subject.app_id
-
- # 安全校验: 如果传入了 app_id(字符串),确保与签名身份一致
- if message_in.app_id:
- if message_in.app_id != current_subject.app_id:
- raise HTTPException(
- status_code=403,
- detail=f"传入的 app_id ({message_in.app_id}) 与认证应用不匹配"
- )
- # 使用当前认证应用的 app_id(字符串)
- message_in.app_id = app_id_str
- # 2. 确定接收者 (Receiver Resolution)
- final_receiver_id = None
- is_broadcast = getattr(message_in, "is_broadcast", False)
- if is_broadcast:
- if message_in.type != MessageType.NOTIFICATION:
- raise HTTPException(status_code=403, detail="广播模式仅支持系统通知 (NOTIFICATION)")
- elif message_in.receiver_id:
- # 方式 A: 直接指定 User ID
- final_receiver_id = message_in.receiver_id
- user = db.query(User).filter(User.id == final_receiver_id).first()
- if not user:
- raise HTTPException(status_code=404, detail="接收用户未找到")
-
- elif message_in.app_user_id and message_in.app_id:
- # 方式 B: 通过 App User ID 查找映射
- # 注意:如果是用户发送,必须要提供 app_id 才能查映射
- # 如果是应用发送,message_in.app_id 已经是字符串类型
-
- # 如果 app_id_int 还没有设置(用户发送且提供了字符串 app_id),需要查找
- if app_id_int is None:
- app = db.query(Application).filter(Application.app_id == message_in.app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail=f"应用未找到: {message_in.app_id}")
- app_id_int = app.id
- app_id_str = app.app_id
-
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id_int, # 使用 Integer ID 查询映射表
- AppUserMapping.mapped_key == message_in.app_user_id
- ).first()
-
- if not mapping:
- raise HTTPException(
- status_code=404,
- detail=f"用户映射未找到: App {message_in.app_id}, User {message_in.app_user_id}"
- )
- final_receiver_id = mapping.user_id
- else:
- raise HTTPException(status_code=400, detail="必须指定 receiver_id 或 (app_id + app_user_id),或者设置 is_broadcast=True")
- # 3. 处理 SSO 跳转链接 (Link Generation)
- final_action_url = message_in.action_url
-
- if message_in.type == MessageType.NOTIFICATION and message_in.auto_sso and app_id_str and message_in.target_url:
- # 生成 jump 接口 URL,用户点击时调用后端接口生成 callback URL
- # 格式: {PLATFORM_URL}/api/v1/simple/sso/jump?app_id={APP_ID}&redirect_to={TARGET_URL}
-
- base_url = settings.API_V1_STR # /api/v1
-
- encoded_target = quote(message_in.target_url)
- final_action_url = f"{base_url}/simple/sso/jump?app_id={app_id_str}&redirect_to={encoded_target}"
- # 处理内容 (如果是文件类型且传入的是 URL,尝试提取 Key)
- content_val = message_in.content if isinstance(message_in.content, str) else str(message_in.content)
- if message_in.content_type in [ContentType.IMAGE, ContentType.VIDEO, ContentType.FILE]:
- # 简单判断: 如果包含 bucket name,可能是 URL
- if settings.MINIO_BUCKET_NAME in content_val and "http" in content_val:
- try:
- # 尝试从 URL 中提取 path
- parsed = urlparse(content_val)
- path = parsed.path.lstrip('/')
- # path 可能是 "bucket_name/object_key"
- if path.startswith(settings.MINIO_BUCKET_NAME + "/"):
- content_val = path[len(settings.MINIO_BUCKET_NAME)+1:]
- except:
- pass # 提取失败则保持原样
- # 4. 创建消息
- if is_broadcast:
- # 查找所有活跃用户 (is_deleted=0)
- active_users = db.query(User.id).filter(
- User.status == "ACTIVE",
- User.is_deleted == 0
- ).all()
-
- if not active_users:
- raise HTTPException(status_code=400, detail="没有可发送的活跃用户")
-
- messages_to_insert = []
- for u in active_users:
- messages_to_insert.append(
- Message(
- sender_id=sender_id,
- receiver_id=u.id,
- app_id=app_id_int,
- type=message_in.type,
- content_type=message_in.content_type,
- title=message_in.title,
- content=content_val,
- action_url=final_action_url,
- action_text=message_in.action_text
- )
- )
-
- db.add_all(messages_to_insert)
- db.commit()
-
- # 使用第一条消息进行签名作为接口的返回值
- db.refresh(messages_to_insert[0])
- processed_msg = _process_message_content(messages_to_insert[0])
-
- # 提取推送所需数据,避免在异步任务中触发延迟加载
- push_tasks = [{"id": msg.id, "receiver_id": msg.receiver_id} for msg in messages_to_insert]
-
- async def send_broadcast_ws():
- for t in push_tasks:
- push_payload = {
- "type": "NEW_MESSAGE",
- "data": {
- "id": t["id"],
- "type": processed_msg.type,
- "content_type": processed_msg.content_type,
- "title": processed_msg.title,
- "content": processed_msg.content,
- "action_url": processed_msg.action_url,
- "action_text": processed_msg.action_text,
- "sender_name": "系统通知" if not sender_id else "用户私信",
- "sender_id": sender_id,
- "created_at": str(processed_msg.created_at),
- "app_id": processed_msg.app_id,
- "app_name": processed_msg.app_name,
- }
- }
- await manager.send_personal_message(push_payload, t["receiver_id"])
-
- background_tasks.add_task(send_broadcast_ws)
- return processed_msg
-
- else:
- message = Message(
- sender_id=sender_id,
- receiver_id=final_receiver_id,
- app_id=app_id_int, # 使用 Integer ID 存储到数据库
- type=message_in.type,
- content_type=message_in.content_type,
- title=message_in.title,
- content=content_val,
- action_url=final_action_url,
- action_text=message_in.action_text
- )
- db.add(message)
- db.commit()
- db.refresh(message)
-
- # 5. 触发实时推送 (WebSocket)
- # 处理用于推送的消息内容 (签名)
- processed_msg = _process_message_content(message)
-
- push_payload = {
- "type": "NEW_MESSAGE",
- "data": {
- "id": processed_msg.id,
- "type": processed_msg.type,
- "content_type": processed_msg.content_type,
- "title": processed_msg.title,
- "content": processed_msg.content, # 使用签名后的 URL
- "action_url": processed_msg.action_url,
- "action_text": processed_msg.action_text,
- "sender_name": "系统通知" if not sender_id else "用户私信", # 简化处理
- "sender_id": sender_id, # Add sender_id for frontend to decide left/right
- "created_at": str(processed_msg.created_at),
- # 附加应用信息,便于前端按应用拆分系统通知会话
- "app_id": message.app_id,
- "app_name": message.app.app_name if message.app else None,
- }
- }
-
- # 使用后台任务发送 WS 消息,避免阻塞 HTTP 响应
- # 如果是发给自己,receiver_id == sender_id,ws 会收到一次
- background_tasks.add_task(manager.send_personal_message, push_payload, final_receiver_id)
-
- return processed_msg
- @router.get("/", response_model=List[MessageResponse])
- def read_messages(
- db: Session = Depends(get_db),
- skip: int = 0,
- limit: int = 100,
- unread_only: bool = False,
- current_user: User = Depends(deps.get_current_active_user),
- ) -> Any:
- """
- 获取当前用户的消息列表 (所有历史记录)
- """
- query = db.query(Message).options(joinedload(Message.app)).filter(Message.receiver_id == current_user.id)
- if unread_only:
- query = query.filter(Message.is_read == False)
-
- messages = query.order_by(Message.created_at.desc()).offset(skip).limit(limit).all()
-
- # 处理文件 URL 签名
- return [_process_message_content(msg) for msg in messages]
- @router.get("/conversations", response_model=List[ConversationResponse])
- def get_conversations(
- db: Session = Depends(get_db),
- current_user: User = Depends(deps.get_current_active_user)
- ) -> Any:
- """
- 获取当前用户的会话列表 (聚合)
- - 私信:按用户聚合
- - 系统通知:按应用(app)拆分成多个会话,类似多个“系统私信”
- """
- # 查找所有与我相关的消息,并预加载 app 信息,便于显示应用名
- messages = (
- db.query(Message)
- .options(joinedload(Message.app))
- .filter(
- or_(
- Message.sender_id == current_user.id,
- Message.receiver_id == current_user.id,
- )
- )
- .order_by(Message.created_at.desc())
- .limit(1000)
- .all()
- )
- conversations_map: dict[int, dict] = {}
-
- for msg in messages:
- # 确定对话方 (Counterpart)
- other_id = None
- other_user = None
- # 系统通知:按 app 拆分。约定:同一个 app 的系统会话使用负数 id,避免和真实用户冲突
- if msg.type == MessageType.NOTIFICATION:
- if msg.app_id:
- other_id = -int(msg.app_id)
- else:
- # 兼容历史数据:没有 app_id 的系统通知归为一个“系统通知”会话
- other_id = 0
- elif msg.sender_id == current_user.id and msg.receiver_id == current_user.id:
- # 文件传输助手
- other_id = current_user.id
- other_user = current_user
- elif msg.sender_id == current_user.id:
- other_id = msg.receiver_id
- other_user = msg.receiver
- else:
- other_id = msg.sender_id
- other_user = msg.sender
-
- # 如果是私信但没找到用户,跳过
- if other_id not in (0, None) and not other_user and msg.type != MessageType.NOTIFICATION:
- continue
- # 如果这个对话方还没处理过
- if other_id not in conversations_map:
- if msg.type == MessageType.NOTIFICATION:
- # 系统会话
- if msg.app_id and msg.app:
- username = msg.app.app_id or f"APP-{msg.app_id}"
- full_name = msg.app.app_name or username
- is_system = True
- app_id = msg.app_id
- app_name = msg.app.app_name
- else:
- # 老的统一系统通知
- username = "System"
- full_name = "系统通知"
- is_system = True
- app_id = None
- app_name = None
- else:
- # 普通用户会话
- username = other_user.mobile # User has mobile, not username
- full_name = other_user.name or other_user.english_name or other_user.mobile
- is_system = False
- app_id = None
- app_name = None
- conversations_map[other_id] = {
- "user_id": other_id,
- "username": username,
- "full_name": full_name,
- "unread_count": 0,
- "last_message": msg.content if msg.content_type == ContentType.TEXT else f"[{msg.content_type}]",
- "last_message_type": msg.content_type,
- "updated_at": msg.created_at,
- "is_system": is_system,
- "app_id": app_id,
- "app_name": app_name,
- }
-
- # 累加未读数 (只计算接收方是自己的未读消息)
- # 注意: 这里的 is_read 是针对接收者的状态
- # 即使是自己发送的消息,msg.receiver_id 也不会是自己(除非发给自己)
- # 所以这里的判断逻辑是: 如果我是接收者,且未读,则计入未读数
- if not msg.is_read and msg.receiver_id == current_user.id:
- conversations_map[other_id]["unread_count"] += 1
- return list(conversations_map.values())
- @router.get("/history/{other_user_id}", response_model=List[MessageResponse])
- def get_chat_history(
- other_user_id: int,
- skip: int = 0,
- limit: int = 50,
- db: Session = Depends(get_db),
- current_user: User = Depends(deps.get_current_active_user)
- ) -> Any:
- """
- 获取与特定用户/系统会话的聊天记录
- - other_user_id > 0: 普通用户私信
- - other_user_id == 0: 兼容历史的“所有系统通知”会话
- - other_user_id < 0: 按 app 拆分的系统通知会话(-app_id)
- """
- if other_user_id == 0:
- # 所有系统通知(兼容旧实现)
- query = db.query(Message).options(joinedload(Message.app)).filter(
- Message.receiver_id == current_user.id,
- Message.type == MessageType.NOTIFICATION,
- ).order_by(Message.created_at.desc())
- elif other_user_id < 0:
- # 单个应用的系统通知会话
- app_id = -other_user_id
- query = db.query(Message).options(joinedload(Message.app)).filter(
- Message.receiver_id == current_user.id,
- Message.type == MessageType.NOTIFICATION,
- Message.app_id == app_id,
- ).order_by(Message.created_at.desc())
- else:
- # 用户私信
- query = db.query(Message).options(joinedload(Message.app)).filter(
- or_(
- and_(Message.sender_id == current_user.id, Message.receiver_id == other_user_id),
- and_(Message.sender_id == other_user_id, Message.receiver_id == current_user.id),
- )
- ).order_by(Message.created_at.desc()) # 最新在前
-
- messages = query.offset(skip).limit(limit).all()
-
- return [_process_message_content(msg) for msg in messages]
- @router.get("/unread-count", response_model=int)
- def get_unread_count(
- db: Session = Depends(get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ) -> Any:
- count = db.query(Message).filter(
- Message.receiver_id == current_user.id,
- Message.is_read == False
- ).count()
- return count
- @router.put("/{message_id}/read", response_model=MessageResponse)
- def mark_as_read(
- message_id: int,
- db: Session = Depends(get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ) -> Any:
- message = db.query(Message).filter(
- Message.id == message_id,
- Message.receiver_id == current_user.id
- ).first()
- if not message:
- raise HTTPException(status_code=404, detail="Message not found")
-
- if not message.is_read:
- message.is_read = True
- message.read_at = datetime.now()
- db.add(message)
- db.commit()
- db.refresh(message)
-
- return _process_message_content(message)
- @router.put("/read-all", response_model=dict)
- def mark_all_read(
- db: Session = Depends(get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ) -> Any:
- now = datetime.now()
- result = db.query(Message).filter(
- Message.receiver_id == current_user.id,
- Message.is_read == False
- ).update(
- {
- "is_read": True,
- "read_at": now
- },
- synchronize_session=False
- )
- db.commit()
- return {"updated_count": result}
- @router.delete("/{message_id}", response_model=MessageResponse)
- def delete_message(
- message_id: int,
- db: Session = Depends(get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ) -> Any:
- message = db.query(Message).filter(
- Message.id == message_id,
- Message.receiver_id == current_user.id
- ).first()
- if not message:
- raise HTTPException(status_code=404, detail="Message not found")
-
- # 先处理返回数据,避免删除后无法访问
- processed_msg = _process_message_content(message)
-
- db.delete(message)
- db.commit()
- return processed_msg
- @router.get("/{message_id}/callback-url", response_model=dict)
- def get_message_callback_url(
- message_id: int,
- db: Session = Depends(get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ) -> Any:
- """
- 获取消息的 callback URL(用于通知按钮点击)
- 内部执行 jump 接口的逻辑,实时生成 ticket 和 callback URL
- """
- # 1. 获取消息
- message = db.query(Message).filter(Message.id == message_id).first()
- if not message:
- raise HTTPException(status_code=404, detail="消息未找到")
-
- # 2. 验证权限:只有接收者可以获取
- if message.receiver_id != current_user.id:
- raise HTTPException(status_code=403, detail="无权访问此消息")
-
- # 3. 检查是否有 action_url(jump 接口 URL)
- if not message.action_url:
- raise HTTPException(status_code=400, detail="此消息没有配置跳转链接")
-
- # 4. 解析 action_url,提取 app_id 和 redirect_to
- # action_url 格式: /api/v1/simple/sso/jump?app_id=xxx&redirect_to=xxx
- parsed = urlparse(message.action_url)
- if not parsed.path.endswith("/sso/jump"):
- raise HTTPException(status_code=400, detail="无效的跳转链接格式")
-
- query_params = parse_qs(parsed.query)
- app_id = query_params.get("app_id", [None])[0]
- redirect_to = query_params.get("redirect_to", [None])[0]
-
- if not app_id or not redirect_to:
- raise HTTPException(status_code=400, detail="跳转链接参数不完整")
-
- # 5. 执行 jump 接口的逻辑(但不返回 RedirectResponse,而是返回 JSON)
- app = db.query(Application).filter(Application.app_id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # 6. 生成 Ticket(使用当前登录用户)
- ticket = TicketService.generate_ticket(current_user.id, app_id)
-
- # 7. 获取应用回调地址
- redirect_base = ""
- if app.redirect_uris:
- try:
- uris = json.loads(app.redirect_uris)
- if isinstance(uris, list) and len(uris) > 0:
- redirect_base = uris[0]
- elif isinstance(uris, str):
- redirect_base = uris
- except (json.JSONDecodeError, TypeError):
- redirect_base = app.redirect_uris.strip()
-
- if not redirect_base:
- raise HTTPException(status_code=400, detail="应用未配置回调地址")
-
- # 8. 构造最终 callback URL
- parsed_uri = urlparse(redirect_base)
- callback_query_params = parse_qs(parsed_uri.query)
-
- callback_query_params['ticket'] = [ticket]
- callback_query_params['next'] = [redirect_to]
-
- new_query = urlencode(callback_query_params, doseq=True)
- callback_url = urlunparse((
- parsed_uri.scheme,
- parsed_uri.netloc,
- parsed_uri.path,
- parsed_uri.params,
- new_query,
- parsed_uri.fragment
- ))
-
- return {"callback_url": callback_url}
|