from typing import List, Any import logging from fastapi import APIRouter, Depends, HTTPException, Body, BackgroundTasks, Request, Query from sqlalchemy.orm import Session from sqlalchemy import or_ from sqlalchemy.exc import IntegrityError from app.api.v1 import deps from app.core import security from app.core.utils import generate_english_name, get_client_ip from app.models.user import User, UserRole from app.models.mapping import AppUserMapping from app.schemas.user import User as UserSchema, UserCreate, UserUpdate, UserList, PromoteUserRequest, BatchResetEnglishNameRequest from app.services.webhook_service import WebhookService from app.services.captcha_service import CaptchaService from app.services.log_service import LogService from app.schemas.operation_log import ActionType router = APIRouter() logger = logging.getLogger(__name__) @router.get("/search", response_model=List[UserSchema], summary="搜索用户") def search_users( keyword: str = Query(None, alias="q"), limit: int = 20, db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): """ 搜索用户(支持姓名、手机号、英文名)。 """ query = db.query(User).filter( User.is_deleted == 0, User.status == "ACTIVE" ) if keyword: query = query.filter( or_( User.mobile.ilike(f"%{keyword}%"), User.name.ilike(f"%{keyword}%"), User.english_name.ilike(f"%{keyword}%") ) ) # Exclude self query = query.filter(User.id != current_user.id) users = query.limit(limit).all() return users @router.get("/", response_model=UserList, summary="获取用户列表") def read_users( skip: int = 0, limit: int = 10, status: str = None, role: str = None, mobile: str = None, name: str = None, english_name: str = None, keyword: str = None, db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): """ 获取用户列表。只有超级管理员可以查看所有用户。 支持通过 mobile, name, english_name 精确/模糊筛选, 也支持通过 keyword 进行跨字段模糊搜索。 """ if current_user.role != "SUPER_ADMIN": raise HTTPException(status_code=403, detail="权限不足") # Filter out soft-deleted users query = db.query(User).filter(User.is_deleted == 0) if status: query = query.filter(User.status == status) if role: query = query.filter(User.role == role) # Specific field filters (AND logic) if mobile: query = query.filter(User.mobile.ilike(f"%{mobile}%")) if name: query = query.filter(User.name.ilike(f"%{name}%")) if english_name: query = query.filter(User.english_name.ilike(f"%{english_name}%")) # Keyword search (OR logic across fields) - intersects with previous filters if keyword: query = query.filter( or_( User.mobile.ilike(f"%{keyword}%"), User.name.ilike(f"%{keyword}%"), User.english_name.ilike(f"%{keyword}%") ) ) total = query.count() users = query.order_by(User.id.desc()).offset(skip).limit(limit).all() return {"total": total, "items": users} @router.post("/", response_model=UserSchema, summary="创建用户") def create_user( *, db: Session = Depends(deps.get_db), user_in: UserCreate, request: Request, background_tasks: BackgroundTasks, current_user: User = Depends(deps.get_current_active_user), ): """ 创建新用户。只有超级管理员可以直接创建用户。 公开注册请使用 /open/register。 """ if current_user.role != "SUPER_ADMIN": raise HTTPException(status_code=403, detail="权限不足") # Verify Admin Password if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash): logger.warning(f"管理员创建用户失败: 密码错误 (Admin: {current_user.mobile})") raise HTTPException(status_code=403, detail="管理员密码错误") user = db.query(User).filter(User.mobile == user_in.mobile).first() if user: raise HTTPException( status_code=400, detail="该手机号已在系统中注册", ) if user_in.password: user_in.password = user_in.password.strip() # Remove spaces from name if user_in.name: user_in.name = user_in.name.replace(" ", "") # Generate default name if missing if not user_in.name: random_suffix = security.generate_alphanumeric_password(6) user_in.name = f"用户{random_suffix}" # Ensure unique name if user_in.name: original_name = user_in.name counter = 1 while db.query(User).filter(User.name == user_in.name, User.is_deleted == 0).first(): user_in.name = f"{original_name}{counter}" counter += 1 english_name = user_in.english_name if not english_name and user_in.name: english_name = generate_english_name(user_in.name) # Ensure unique english_name if english_name: original_english_name = english_name counter = 1 while db.query(User).filter(User.english_name == english_name, User.is_deleted == 0).first(): english_name = f"{original_english_name}{counter}" counter += 1 db_user = User( mobile=user_in.mobile, name=user_in.name, english_name=english_name, password_hash=security.get_password_hash(user_in.password), status="ACTIVE", # Admin created users are active by default role=user_in.role or UserRole.ORDINARY_USER ) db.add(db_user) db.commit() db.refresh(db_user) # Log Operation LogService.create_log( db=db, operator_id=current_user.id, action_type=ActionType.MANUAL_ADD, target_user_id=db_user.id, target_mobile=db_user.mobile, ip_address=get_client_ip(request), details={"role": db_user.role} ) logger.info(f"管理员创建用户成功: {db_user.mobile} (Role: {db_user.role})") return db_user @router.post("/batch/reset-english-name", summary="批量重置用户英文名") def batch_reset_english_name( *, db: Session = Depends(deps.get_db), req: BatchResetEnglishNameRequest, request: Request, current_user: User = Depends(deps.get_current_active_user), ): """ 批量重置选中用户的英文名。 规则:根据姓名生成拼音,如有重复自动追加数字后缀。 需要管理员密码验证。 """ if current_user.role != "SUPER_ADMIN": raise HTTPException(status_code=403, detail="权限不足") # Verify Admin Password if not security.verify_password(req.admin_password, current_user.password_hash): logger.warning(f"批量重置英文名失败: 密码错误 (Admin: {current_user.mobile})") raise HTTPException(status_code=403, detail="管理员密码错误") if not req.user_ids: raise HTTPException(status_code=400, detail="请选择用户") success_count = 0 users = db.query(User).filter(User.id.in_(req.user_ids), User.is_deleted == 0).all() for user in users: if not user.name: continue old_english_name = user.english_name new_english_name = generate_english_name(user.name) # Uniqueness check if new_english_name: original_base = new_english_name counter = 1 # Helper to check existence def check_exists(name, current_id): return db.query(User).filter( User.english_name == name, User.is_deleted == 0, User.id != current_id ).first() while check_exists(new_english_name, user.id): new_english_name = f"{original_base}{counter}" counter += 1 if old_english_name != new_english_name: user.english_name = new_english_name db.add(user) # Log LogService.create_log( db=db, operator_id=current_user.id, action_type=ActionType.UPDATE, target_user_id=user.id, target_mobile=user.mobile, ip_address=get_client_ip(request), details={ "field": "english_name", "old": old_english_name, "new": new_english_name, "reason": "BATCH_RESET" } ) success_count += 1 db.commit() logger.info(f"批量重置英文名完成: 成功 {success_count} 个 (Admin: {current_user.mobile})") return {"success": True, "count": success_count} @router.put("/{user_id}", response_model=UserSchema, summary="更新用户") def update_user( *, db: Session = Depends(deps.get_db), user_id: int, user_in: UserUpdate, request: Request, background_tasks: BackgroundTasks, current_user: User = Depends(deps.get_current_active_user), ): """ 更新用户信息。超级管理员可以更新任何人,用户只能更新自己。 """ user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="用户不存在") # Permission Check if current_user.role != "SUPER_ADMIN" and current_user.id != user_id: raise HTTPException(status_code=403, detail="权限不足") update_data = user_in.model_dump(exclude_unset=True) # Track actions for logging actions = [] # Only Super Admin can change mobile if "mobile" in update_data: if current_user.role != "SUPER_ADMIN": del update_data["mobile"] else: # Require admin password for mobile change if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash): logger.warning(f"修改用户手机号失败: 管理员密码错误 (Target: {user.mobile})") raise HTTPException(status_code=403, detail="管理员密码错误") # Check uniqueness existing_user = db.query(User).filter(User.mobile == update_data["mobile"]).first() if existing_user and existing_user.id != user_id: raise HTTPException(status_code=400, detail="该手机号已存在") if user.mobile != update_data["mobile"]: old_mobile = user.mobile new_mobile = update_data["mobile"] # Update Mappings first (logic: user mobile change should propagate to mappings) # Find mappings where mapped_key matches old_mobile mappings = db.query(AppUserMapping).filter( AppUserMapping.user_id == user.id, AppUserMapping.mapped_key == old_mobile ).all() for mapping in mappings: mapping.mapped_key = new_mobile db.add(mapping) try: db.flush() except IntegrityError: db.rollback() logger.error(f"修改手机号失败: 映射冲突 ({old_mobile} -> {new_mobile})") raise HTTPException(status_code=400, detail="修改失败:新手机号在某些应用中已存在映射关联") actions.append((ActionType.UPDATE, {"field": "mobile", "old": old_mobile, "new": new_mobile})) # Only Super Admin can change status or role if "status" in update_data: if current_user.role != "SUPER_ADMIN": del update_data["status"] elif update_data["status"] == "DISABLED" and user_id == current_user.id: raise HTTPException(status_code=400, detail="超级管理员不能禁用自己") else: # Require admin password for status change if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash): logger.warning(f"修改用户状态失败: 密码错误") raise HTTPException(status_code=403, detail="管理员密码错误") # Add Log Action action_type = ActionType.DISABLE if update_data["status"] == "DISABLED" else ActionType.ENABLE actions.append((action_type, {"old": user.status, "new": update_data["status"]})) if "role" in update_data: if current_user.role != "SUPER_ADMIN": del update_data["role"] elif user_id == current_user.id and update_data["role"] != "SUPER_ADMIN": # Prevent demoting self raise HTTPException(status_code=400, detail="超级管理员不能降级自己") else: # Require admin password for role change if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash): logger.warning(f"修改用户角色失败: 密码错误") raise HTTPException(status_code=403, detail="管理员密码错误") actions.append((ActionType.CHANGE_ROLE, {"old": user.role, "new": update_data["role"]})) if "admin_password" in update_data: del update_data["admin_password"] if "is_deleted" in update_data: if current_user.role != "SUPER_ADMIN": del update_data["is_deleted"] elif update_data["is_deleted"] == 1 and user_id == current_user.id: raise HTTPException(status_code=400, detail="超级管理员不能删除自己") elif update_data["is_deleted"] == 1: actions.append((ActionType.DELETE, {})) if "password" in update_data: password = update_data["password"] if password: password = password.strip() hashed_password = security.get_password_hash(password) del update_data["password"] user.password_hash = hashed_password # Auto-generate english_name if name changed and english_name not provided if "name" in update_data and update_data["name"]: update_data["name"] = update_data["name"].replace(" ", "") if "english_name" not in update_data or not update_data["english_name"]: update_data["english_name"] = generate_english_name(update_data["name"]) for field, value in update_data.items(): setattr(user, field, value) db.add(user) db.commit() db.refresh(user) # Trigger Webhook event_type = "UPDATE" if user.status == "DISABLED": event_type = "DISABLE" background_tasks.add_task(WebhookService.trigger_user_event, db, user.id, event_type) # Record Logs if current_user.role == "SUPER_ADMIN" and actions: for action_type, details in actions: LogService.create_log( db=db, operator_id=current_user.id, action_type=action_type, target_user_id=user.id, target_mobile=user.mobile, ip_address=get_client_ip(request), details=details ) logger.info(f"更新用户信息成功: {user.mobile} (ID: {user.id})") return user @router.post("/{user_id}/promote", response_model=UserSchema, summary="提升用户权限") def promote_user( *, db: Session = Depends(deps.get_db), user_id: int, req: PromoteUserRequest, request: Request, current_user: User = Depends(deps.get_current_active_user), ): """ 将开发者提升为超级管理员。 需要验证(管理员密码 + 验证码)。 """ if current_user.role != "SUPER_ADMIN": raise HTTPException(status_code=403, detail="权限不足") # 1. Verify Password if not security.verify_password(req.password, current_user.password_hash): logger.warning(f"提升权限失败: 密码错误 (Admin: {current_user.mobile})") raise HTTPException(status_code=403, detail="密码错误") # 2. Verify Captcha if not CaptchaService.verify_captcha(req.captcha_id, req.captcha_code): logger.warning(f"提升权限失败: 验证码错误") raise HTTPException(status_code=400, detail="验证码错误") user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="用户不存在") old_role = user.role user.role = "SUPER_ADMIN" user.status = "ACTIVE" # Ensure they are active db.add(user) db.commit() db.refresh(user) # Log Operation LogService.create_log( db=db, operator_id=current_user.id, action_type=ActionType.CHANGE_ROLE, target_user_id=user.id, target_mobile=user.mobile, ip_address=get_client_ip(request), details={"old": old_role, "new": "SUPER_ADMIN"} ) logger.info(f"提升用户为超管: {user.mobile}") return user @router.delete("/{user_id}", response_model=UserSchema, summary="删除用户") def delete_user( *, db: Session = Depends(deps.get_db), user_id: int, request: Request, current_user: User = Depends(deps.get_current_active_user), background_tasks: BackgroundTasks, ): """ 软删除用户。仅限超级管理员。 """ if current_user.role != "SUPER_ADMIN": raise HTTPException(status_code=403, detail="权限不足") if user_id == current_user.id: raise HTTPException(status_code=400, detail="超级管理员不能删除自己") user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="用户不存在") user.is_deleted = 1 # Using Integer 1 for True user.status = "DISABLED" # Also disable login db.add(user) db.commit() background_tasks.add_task(WebhookService.trigger_user_event, db, user.id, "DELETE") # Log Operation LogService.create_log( db=db, operator_id=current_user.id, action_type=ActionType.DELETE, target_user_id=user.id, target_mobile=user.mobile, ip_address=get_client_ip(request), details={"status": "DISABLED"} ) logger.info(f"删除用户成功: {user.mobile}") return user @router.get("/me", response_model=UserSchema, summary="获取当前用户信息") def read_user_me( current_user: User = Depends(deps.get_current_active_user), ): """ 获取当前登录用户的信息。 """ return current_user @router.get("/{user_id}", response_model=UserSchema, summary="根据ID获取用户") def read_user_by_id( user_id: int, current_user: User = Depends(deps.get_current_active_user), db: Session = Depends(deps.get_db), ): """ 根据用户ID获取特定用户信息。 """ user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=404, detail="该用户ID在系统中不存在", ) return user