| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534 |
- from typing import List, Any
- import logging
- from fastapi import APIRouter, Depends, HTTPException, Body, BackgroundTasks, Request, Query
- from sqlalchemy.orm import Session
- from sqlalchemy import or_
- from sqlalchemy.exc import IntegrityError
- from app.api.v1 import deps
- from app.core import security
- from app.core.utils import generate_english_name, get_client_ip
- from app.models.user import User, UserRole
- from app.models.mapping import AppUserMapping
- from app.schemas.user import User as UserSchema, UserCreate, UserUpdate, UserList, PromoteUserRequest, BatchResetEnglishNameRequest
- from app.services.webhook_service import WebhookService
- from app.services.captcha_service import CaptchaService
- from app.services.log_service import LogService
- from app.schemas.operation_log import ActionType
- router = APIRouter()
- logger = logging.getLogger(__name__)
- @router.get("/search", response_model=List[UserSchema], summary="搜索用户")
- def search_users(
- keyword: str = Query(None),
- limit: int = 20,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 搜索可作为应用转让目标的用户(开发者或超级管理员)。
- """
- # Only Developers and Super Admins can search
- if current_user.role not in ["SUPER_ADMIN", "DEVELOPER"]:
- raise HTTPException(status_code=403, detail="权限不足")
- query = db.query(User).filter(
- User.is_deleted == 0,
- User.status == "ACTIVE",
- User.role.in_(["DEVELOPER", "SUPER_ADMIN"])
- )
-
- if keyword:
- query = query.filter(User.mobile.ilike(f"%{keyword}%"))
-
- # Exclude self
- query = query.filter(User.id != current_user.id)
-
- users = query.limit(limit).all()
- return users
- @router.get("/", response_model=UserList, summary="获取用户列表")
- def read_users(
- skip: int = 0,
- limit: int = 10,
- status: str = None,
- role: str = None,
- mobile: str = None,
- name: str = None,
- english_name: str = None,
- keyword: str = None,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取用户列表。只有超级管理员可以查看所有用户。
- 支持通过 mobile, name, english_name 精确/模糊筛选,
- 也支持通过 keyword 进行跨字段模糊搜索。
- """
- if current_user.role != "SUPER_ADMIN":
- raise HTTPException(status_code=403, detail="权限不足")
- # Filter out soft-deleted users
- query = db.query(User).filter(User.is_deleted == 0)
-
- if status:
- query = query.filter(User.status == status)
- if role:
- query = query.filter(User.role == role)
-
- # Specific field filters (AND logic)
- if mobile:
- query = query.filter(User.mobile.ilike(f"%{mobile}%"))
- if name:
- query = query.filter(User.name.ilike(f"%{name}%"))
- if english_name:
- query = query.filter(User.english_name.ilike(f"%{english_name}%"))
-
- # Keyword search (OR logic across fields) - intersects with previous filters
- if keyword:
- query = query.filter(
- or_(
- User.mobile.ilike(f"%{keyword}%"),
- User.name.ilike(f"%{keyword}%"),
- User.english_name.ilike(f"%{keyword}%")
- )
- )
-
- total = query.count()
- users = query.order_by(User.id.desc()).offset(skip).limit(limit).all()
- return {"total": total, "items": users}
- @router.post("/", response_model=UserSchema, summary="创建用户")
- def create_user(
- *,
- db: Session = Depends(deps.get_db),
- user_in: UserCreate,
- request: Request,
- background_tasks: BackgroundTasks,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 创建新用户。只有超级管理员可以直接创建用户。
- 公开注册请使用 /open/register。
- """
- if current_user.role != "SUPER_ADMIN":
- raise HTTPException(status_code=403, detail="权限不足")
- # Verify Admin Password
- if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash):
- logger.warning(f"管理员创建用户失败: 密码错误 (Admin: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="管理员密码错误")
- user = db.query(User).filter(User.mobile == user_in.mobile).first()
- if user:
- raise HTTPException(
- status_code=400,
- detail="该手机号已在系统中注册",
- )
-
- if user_in.password:
- user_in.password = user_in.password.strip()
- # Remove spaces from name
- if user_in.name:
- user_in.name = user_in.name.replace(" ", "")
- # Generate default name if missing
- if not user_in.name:
- random_suffix = security.generate_alphanumeric_password(6)
- user_in.name = f"用户{random_suffix}"
- # Ensure unique name
- if user_in.name:
- original_name = user_in.name
- counter = 1
- while db.query(User).filter(User.name == user_in.name, User.is_deleted == 0).first():
- user_in.name = f"{original_name}{counter}"
- counter += 1
- english_name = user_in.english_name
- if not english_name and user_in.name:
- english_name = generate_english_name(user_in.name)
- # Ensure unique english_name
- if english_name:
- original_english_name = english_name
- counter = 1
- while db.query(User).filter(User.english_name == english_name, User.is_deleted == 0).first():
- english_name = f"{original_english_name}{counter}"
- counter += 1
- db_user = User(
- mobile=user_in.mobile,
- name=user_in.name,
- english_name=english_name,
- password_hash=security.get_password_hash(user_in.password),
- status="ACTIVE", # Admin created users are active by default
- role=user_in.role or UserRole.ORDINARY_USER
- )
- db.add(db_user)
- db.commit()
- db.refresh(db_user)
-
- # Log Operation
- LogService.create_log(
- db=db,
- operator_id=current_user.id,
- action_type=ActionType.MANUAL_ADD,
- target_user_id=db_user.id,
- target_mobile=db_user.mobile,
- ip_address=get_client_ip(request),
- details={"role": db_user.role}
- )
-
- logger.info(f"管理员创建用户成功: {db_user.mobile} (Role: {db_user.role})")
- return db_user
- @router.post("/batch/reset-english-name", summary="批量重置用户英文名")
- def batch_reset_english_name(
- *,
- db: Session = Depends(deps.get_db),
- req: BatchResetEnglishNameRequest,
- request: Request,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 批量重置选中用户的英文名。
- 规则:根据姓名生成拼音,如有重复自动追加数字后缀。
- 需要管理员密码验证。
- """
- if current_user.role != "SUPER_ADMIN":
- raise HTTPException(status_code=403, detail="权限不足")
- # Verify Admin Password
- if not security.verify_password(req.admin_password, current_user.password_hash):
- logger.warning(f"批量重置英文名失败: 密码错误 (Admin: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="管理员密码错误")
- if not req.user_ids:
- raise HTTPException(status_code=400, detail="请选择用户")
- success_count = 0
- users = db.query(User).filter(User.id.in_(req.user_ids), User.is_deleted == 0).all()
-
- for user in users:
- if not user.name:
- continue
-
- old_english_name = user.english_name
- new_english_name = generate_english_name(user.name)
-
- # Uniqueness check
- if new_english_name:
- original_base = new_english_name
- counter = 1
-
- # Helper to check existence
- def check_exists(name, current_id):
- return db.query(User).filter(
- User.english_name == name,
- User.is_deleted == 0,
- User.id != current_id
- ).first()
- while check_exists(new_english_name, user.id):
- new_english_name = f"{original_base}{counter}"
- counter += 1
-
- if old_english_name != new_english_name:
- user.english_name = new_english_name
- db.add(user)
-
- # Log
- LogService.create_log(
- db=db,
- operator_id=current_user.id,
- action_type=ActionType.UPDATE,
- target_user_id=user.id,
- target_mobile=user.mobile,
- ip_address=get_client_ip(request),
- details={
- "field": "english_name",
- "old": old_english_name,
- "new": new_english_name,
- "reason": "BATCH_RESET"
- }
- )
- success_count += 1
-
- db.commit()
- logger.info(f"批量重置英文名完成: 成功 {success_count} 个 (Admin: {current_user.mobile})")
-
- return {"success": True, "count": success_count}
- @router.put("/{user_id}", response_model=UserSchema, summary="更新用户")
- def update_user(
- *,
- db: Session = Depends(deps.get_db),
- user_id: int,
- user_in: UserUpdate,
- request: Request,
- background_tasks: BackgroundTasks,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 更新用户信息。超级管理员可以更新任何人,用户只能更新自己。
- """
- user = db.query(User).filter(User.id == user_id).first()
- if not user:
- raise HTTPException(status_code=404, detail="用户不存在")
- # Permission Check
- if current_user.role != "SUPER_ADMIN" and current_user.id != user_id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- update_data = user_in.model_dump(exclude_unset=True)
-
- # Track actions for logging
- actions = []
-
- # Only Super Admin can change mobile
- if "mobile" in update_data:
- if current_user.role != "SUPER_ADMIN":
- del update_data["mobile"]
- else:
- # Require admin password for mobile change
- if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash):
- logger.warning(f"修改用户手机号失败: 管理员密码错误 (Target: {user.mobile})")
- raise HTTPException(status_code=403, detail="管理员密码错误")
- # Check uniqueness
- existing_user = db.query(User).filter(User.mobile == update_data["mobile"]).first()
- if existing_user and existing_user.id != user_id:
- raise HTTPException(status_code=400, detail="该手机号已存在")
-
- if user.mobile != update_data["mobile"]:
- old_mobile = user.mobile
- new_mobile = update_data["mobile"]
-
- # Update Mappings first (logic: user mobile change should propagate to mappings)
- # Find mappings where mapped_key matches old_mobile
- mappings = db.query(AppUserMapping).filter(
- AppUserMapping.user_id == user.id,
- AppUserMapping.mapped_key == old_mobile
- ).all()
-
- for mapping in mappings:
- mapping.mapped_key = new_mobile
- db.add(mapping)
-
- try:
- db.flush()
- except IntegrityError:
- db.rollback()
- logger.error(f"修改手机号失败: 映射冲突 ({old_mobile} -> {new_mobile})")
- raise HTTPException(status_code=400, detail="修改失败:新手机号在某些应用中已存在映射关联")
- actions.append((ActionType.UPDATE, {"field": "mobile", "old": old_mobile, "new": new_mobile}))
- # Only Super Admin can change status or role
- if "status" in update_data:
- if current_user.role != "SUPER_ADMIN":
- del update_data["status"]
- elif update_data["status"] == "DISABLED" and user_id == current_user.id:
- raise HTTPException(status_code=400, detail="超级管理员不能禁用自己")
- else:
- # Require admin password for status change
- if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash):
- logger.warning(f"修改用户状态失败: 密码错误")
- raise HTTPException(status_code=403, detail="管理员密码错误")
-
- # Add Log Action
- action_type = ActionType.DISABLE if update_data["status"] == "DISABLED" else ActionType.ENABLE
- actions.append((action_type, {"old": user.status, "new": update_data["status"]}))
-
- if "role" in update_data:
- if current_user.role != "SUPER_ADMIN":
- del update_data["role"]
- elif user_id == current_user.id and update_data["role"] != "SUPER_ADMIN":
- # Prevent demoting self
- raise HTTPException(status_code=400, detail="超级管理员不能降级自己")
- else:
- # Require admin password for role change
- if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash):
- logger.warning(f"修改用户角色失败: 密码错误")
- raise HTTPException(status_code=403, detail="管理员密码错误")
-
- actions.append((ActionType.CHANGE_ROLE, {"old": user.role, "new": update_data["role"]}))
- if "admin_password" in update_data:
- del update_data["admin_password"]
- if "is_deleted" in update_data:
- if current_user.role != "SUPER_ADMIN":
- del update_data["is_deleted"]
- elif update_data["is_deleted"] == 1 and user_id == current_user.id:
- raise HTTPException(status_code=400, detail="超级管理员不能删除自己")
- elif update_data["is_deleted"] == 1:
- actions.append((ActionType.DELETE, {}))
- if "password" in update_data:
- password = update_data["password"]
- if password:
- password = password.strip()
- hashed_password = security.get_password_hash(password)
- del update_data["password"]
- user.password_hash = hashed_password
-
- # Auto-generate english_name if name changed and english_name not provided
- if "name" in update_data and update_data["name"]:
- update_data["name"] = update_data["name"].replace(" ", "")
- if "english_name" not in update_data or not update_data["english_name"]:
- update_data["english_name"] = generate_english_name(update_data["name"])
- for field, value in update_data.items():
- setattr(user, field, value)
-
- db.add(user)
- db.commit()
- db.refresh(user)
-
- # Trigger Webhook
- event_type = "UPDATE"
- if user.status == "DISABLED":
- event_type = "DISABLE"
- background_tasks.add_task(WebhookService.trigger_user_event, db, user.id, event_type)
-
- # Record Logs
- if current_user.role == "SUPER_ADMIN" and actions:
- for action_type, details in actions:
- LogService.create_log(
- db=db,
- operator_id=current_user.id,
- action_type=action_type,
- target_user_id=user.id,
- target_mobile=user.mobile,
- ip_address=get_client_ip(request),
- details=details
- )
- logger.info(f"更新用户信息成功: {user.mobile} (ID: {user.id})")
- return user
- @router.post("/{user_id}/promote", response_model=UserSchema, summary="提升用户权限")
- def promote_user(
- *,
- db: Session = Depends(deps.get_db),
- user_id: int,
- req: PromoteUserRequest,
- request: Request,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 将开发者提升为超级管理员。
- 需要验证(管理员密码 + 验证码)。
- """
- if current_user.role != "SUPER_ADMIN":
- raise HTTPException(status_code=403, detail="权限不足")
-
- # 1. Verify Password
- if not security.verify_password(req.password, current_user.password_hash):
- logger.warning(f"提升权限失败: 密码错误 (Admin: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
-
- # 2. Verify Captcha
- if not CaptchaService.verify_captcha(req.captcha_id, req.captcha_code):
- logger.warning(f"提升权限失败: 验证码错误")
- raise HTTPException(status_code=400, detail="验证码错误")
-
- user = db.query(User).filter(User.id == user_id).first()
- if not user:
- raise HTTPException(status_code=404, detail="用户不存在")
-
- old_role = user.role
- user.role = "SUPER_ADMIN"
- user.status = "ACTIVE" # Ensure they are active
-
- db.add(user)
- db.commit()
- db.refresh(user)
-
- # Log Operation
- LogService.create_log(
- db=db,
- operator_id=current_user.id,
- action_type=ActionType.CHANGE_ROLE,
- target_user_id=user.id,
- target_mobile=user.mobile,
- ip_address=get_client_ip(request),
- details={"old": old_role, "new": "SUPER_ADMIN"}
- )
-
- logger.info(f"提升用户为超管: {user.mobile}")
- return user
- @router.delete("/{user_id}", response_model=UserSchema, summary="删除用户")
- def delete_user(
- *,
- db: Session = Depends(deps.get_db),
- user_id: int,
- request: Request,
- current_user: User = Depends(deps.get_current_active_user),
- background_tasks: BackgroundTasks,
- ):
- """
- 软删除用户。仅限超级管理员。
- """
- if current_user.role != "SUPER_ADMIN":
- raise HTTPException(status_code=403, detail="权限不足")
-
- if user_id == current_user.id:
- raise HTTPException(status_code=400, detail="超级管理员不能删除自己")
-
- user = db.query(User).filter(User.id == user_id).first()
- if not user:
- raise HTTPException(status_code=404, detail="用户不存在")
-
- user.is_deleted = 1 # Using Integer 1 for True
- user.status = "DISABLED" # Also disable login
- db.add(user)
- db.commit()
-
- background_tasks.add_task(WebhookService.trigger_user_event, db, user.id, "DELETE")
-
- # Log Operation
- LogService.create_log(
- db=db,
- operator_id=current_user.id,
- action_type=ActionType.DELETE,
- target_user_id=user.id,
- target_mobile=user.mobile,
- ip_address=get_client_ip(request),
- details={"status": "DISABLED"}
- )
-
- logger.info(f"删除用户成功: {user.mobile}")
- return user
- @router.get("/me", response_model=UserSchema, summary="获取当前用户信息")
- def read_user_me(
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取当前登录用户的信息。
- """
- return current_user
- @router.get("/{user_id}", response_model=UserSchema, summary="根据ID获取用户")
- def read_user_by_id(
- user_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- db: Session = Depends(deps.get_db),
- ):
- """
- 根据用户ID获取特定用户信息。
- """
- user = db.query(User).filter(User.id == user_id).first()
- if not user:
- raise HTTPException(
- status_code=404,
- detail="该用户ID在系统中不存在",
- )
- return user
|