users.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. from typing import List, Any
  2. import logging
  3. from fastapi import APIRouter, Depends, HTTPException, Body, BackgroundTasks, Request, Query
  4. from sqlalchemy.orm import Session
  5. from sqlalchemy import or_
  6. from sqlalchemy.exc import IntegrityError
  7. from app.api.v1 import deps
  8. from app.core import security
  9. from app.core.utils import generate_english_name, get_client_ip
  10. from app.models.user import User, UserRole
  11. from app.models.mapping import AppUserMapping
  12. from app.schemas.user import User as UserSchema, UserCreate, UserUpdate, UserList, PromoteUserRequest, BatchResetEnglishNameRequest, DeleteUserRequest
  13. from app.services.webhook_service import WebhookService
  14. from app.services.captcha_service import CaptchaService
  15. from app.services.sms_service import SmsService
  16. from app.services.log_service import LogService
  17. from app.schemas.operation_log import ActionType
  18. router = APIRouter()
  19. logger = logging.getLogger(__name__)
  20. @router.get("/search", response_model=List[UserSchema], summary="搜索用户")
  21. def search_users(
  22. keyword: str = Query(None, alias="q"),
  23. limit: int = 20,
  24. db: Session = Depends(deps.get_db),
  25. current_subject: deps.AuthSubject = Depends(deps.get_current_user_or_app_by_api_key),
  26. ):
  27. """
  28. 搜索用户(支持姓名、手机号、英文名)。
  29. 支持用户 JWT Token 和应用 API Key (X-App-Access-Token) 两种认证方式。
  30. """
  31. query = db.query(User).filter(
  32. User.is_deleted == 0,
  33. User.status == "ACTIVE"
  34. )
  35. if keyword:
  36. query = query.filter(
  37. or_(
  38. User.mobile.ilike(f"%{keyword}%"),
  39. User.name.ilike(f"%{keyword}%"),
  40. User.english_name.ilike(f"%{keyword}%")
  41. )
  42. )
  43. # 如果是用户认证,排除自己;如果是应用认证,不排除
  44. if isinstance(current_subject, User):
  45. query = query.filter(User.id != current_subject.id)
  46. users = query.limit(limit).all()
  47. return users
  48. @router.get("/", response_model=UserList, summary="获取用户列表")
  49. def read_users(
  50. skip: int = 0,
  51. limit: int = 10,
  52. status: str = None,
  53. role: str = None,
  54. mobile: str = None,
  55. name: str = None,
  56. english_name: str = None,
  57. keyword: str = None,
  58. db: Session = Depends(deps.get_db),
  59. current_user: User = Depends(deps.get_current_active_user),
  60. ):
  61. """
  62. 获取用户列表。已登录用户即可访问。
  63. 支持通过 mobile, name, english_name 精确/模糊筛选,
  64. 也支持通过 keyword 进行跨字段模糊搜索。
  65. """
  66. # Filter out soft-deleted users
  67. query = db.query(User).filter(User.is_deleted == 0)
  68. if status:
  69. query = query.filter(User.status == status)
  70. if role:
  71. query = query.filter(User.role == role)
  72. # Specific field filters (AND logic)
  73. if mobile:
  74. query = query.filter(User.mobile.ilike(f"%{mobile}%"))
  75. if name:
  76. query = query.filter(User.name.ilike(f"%{name}%"))
  77. if english_name:
  78. query = query.filter(User.english_name.ilike(f"%{english_name}%"))
  79. # Keyword search (OR logic across fields) - intersects with previous filters
  80. if keyword:
  81. query = query.filter(
  82. or_(
  83. User.mobile.ilike(f"%{keyword}%"),
  84. User.name.ilike(f"%{keyword}%"),
  85. User.english_name.ilike(f"%{keyword}%")
  86. )
  87. )
  88. total = query.count()
  89. users = query.order_by(User.id.desc()).offset(skip).limit(limit).all()
  90. return {"total": total, "items": users}
  91. @router.post("/", response_model=UserSchema, summary="创建用户")
  92. def create_user(
  93. *,
  94. db: Session = Depends(deps.get_db),
  95. user_in: UserCreate,
  96. request: Request,
  97. background_tasks: BackgroundTasks,
  98. current_user: User = Depends(deps.get_current_active_user),
  99. ):
  100. """
  101. 创建新用户。只有超级管理员可以直接创建用户。
  102. 公开注册请使用 /open/register。
  103. """
  104. if current_user.role != "SUPER_ADMIN":
  105. raise HTTPException(status_code=403, detail="权限不足")
  106. # Verify Admin Password
  107. if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash):
  108. logger.warning(f"管理员创建用户失败: 密码错误 (Admin: {current_user.mobile})")
  109. raise HTTPException(status_code=403, detail="管理员密码错误")
  110. user = db.query(User).filter(User.mobile == user_in.mobile).first()
  111. if user:
  112. raise HTTPException(
  113. status_code=400,
  114. detail="该手机号已在系统中注册",
  115. )
  116. if user_in.password:
  117. user_in.password = user_in.password.strip()
  118. # Remove spaces from name
  119. if user_in.name:
  120. user_in.name = user_in.name.replace(" ", "")
  121. # Generate default name if missing
  122. if not user_in.name:
  123. random_suffix = security.generate_alphanumeric_password(6)
  124. user_in.name = f"用户{random_suffix}"
  125. # Ensure unique name
  126. if user_in.name:
  127. original_name = user_in.name
  128. counter = 1
  129. while db.query(User).filter(User.name == user_in.name, User.is_deleted == 0).first():
  130. user_in.name = f"{original_name}{counter}"
  131. counter += 1
  132. english_name = user_in.english_name
  133. if not english_name and user_in.name:
  134. english_name = generate_english_name(user_in.name)
  135. # Ensure unique english_name
  136. if english_name:
  137. original_english_name = english_name
  138. counter = 1
  139. while db.query(User).filter(User.english_name == english_name, User.is_deleted == 0).first():
  140. english_name = f"{original_english_name}{counter}"
  141. counter += 1
  142. db_user = User(
  143. mobile=user_in.mobile,
  144. name=user_in.name,
  145. english_name=english_name,
  146. password_hash=security.get_password_hash(user_in.password),
  147. status="ACTIVE", # Admin created users are active by default
  148. role=user_in.role or UserRole.ORDINARY_USER
  149. )
  150. db.add(db_user)
  151. db.commit()
  152. db.refresh(db_user)
  153. # Log Operation
  154. LogService.create_log(
  155. db=db,
  156. operator_id=current_user.id,
  157. action_type=ActionType.MANUAL_ADD,
  158. target_user_id=db_user.id,
  159. target_mobile=db_user.mobile,
  160. ip_address=get_client_ip(request),
  161. details={"role": db_user.role}
  162. )
  163. logger.info(f"管理员创建用户成功: {db_user.mobile} (Role: {db_user.role})")
  164. return db_user
  165. @router.post("/batch/reset-english-name", summary="批量重置用户英文名")
  166. def batch_reset_english_name(
  167. *,
  168. db: Session = Depends(deps.get_db),
  169. req: BatchResetEnglishNameRequest,
  170. request: Request,
  171. current_user: User = Depends(deps.get_current_active_user),
  172. ):
  173. """
  174. 批量重置选中用户的英文名。
  175. 规则:根据姓名生成拼音,如有重复自动追加数字后缀。
  176. 需要管理员密码验证。
  177. """
  178. if current_user.role != "SUPER_ADMIN":
  179. raise HTTPException(status_code=403, detail="权限不足")
  180. # Verify Admin Password
  181. if not security.verify_password(req.admin_password, current_user.password_hash):
  182. logger.warning(f"批量重置英文名失败: 密码错误 (Admin: {current_user.mobile})")
  183. raise HTTPException(status_code=403, detail="管理员密码错误")
  184. if not req.user_ids:
  185. raise HTTPException(status_code=400, detail="请选择用户")
  186. success_count = 0
  187. users = db.query(User).filter(User.id.in_(req.user_ids), User.is_deleted == 0).all()
  188. for user in users:
  189. if not user.name:
  190. continue
  191. old_english_name = user.english_name
  192. new_english_name = generate_english_name(user.name)
  193. # Uniqueness check
  194. if new_english_name:
  195. original_base = new_english_name
  196. counter = 1
  197. # Helper to check existence
  198. def check_exists(name, current_id):
  199. return db.query(User).filter(
  200. User.english_name == name,
  201. User.is_deleted == 0,
  202. User.id != current_id
  203. ).first()
  204. while check_exists(new_english_name, user.id):
  205. new_english_name = f"{original_base}{counter}"
  206. counter += 1
  207. if old_english_name != new_english_name:
  208. user.english_name = new_english_name
  209. db.add(user)
  210. # Log
  211. LogService.create_log(
  212. db=db,
  213. operator_id=current_user.id,
  214. action_type=ActionType.UPDATE,
  215. target_user_id=user.id,
  216. target_mobile=user.mobile,
  217. ip_address=get_client_ip(request),
  218. details={
  219. "field": "english_name",
  220. "old": old_english_name,
  221. "new": new_english_name,
  222. "reason": "BATCH_RESET"
  223. }
  224. )
  225. success_count += 1
  226. db.commit()
  227. logger.info(f"批量重置英文名完成: 成功 {success_count} 个 (Admin: {current_user.mobile})")
  228. return {"success": True, "count": success_count}
  229. @router.put("/{user_id}", response_model=UserSchema, summary="更新用户")
  230. def update_user(
  231. *,
  232. db: Session = Depends(deps.get_db),
  233. user_id: int,
  234. user_in: UserUpdate,
  235. request: Request,
  236. background_tasks: BackgroundTasks,
  237. current_user: User = Depends(deps.get_current_active_user),
  238. ):
  239. """
  240. 更新用户信息。超级管理员可以更新任何人,用户只能更新自己。
  241. """
  242. user = db.query(User).filter(User.id == user_id).first()
  243. if not user:
  244. raise HTTPException(status_code=404, detail="用户不存在")
  245. # Permission Check
  246. if current_user.role != "SUPER_ADMIN" and current_user.id != user_id:
  247. raise HTTPException(status_code=403, detail="权限不足")
  248. update_data = user_in.model_dump(exclude_unset=True)
  249. # Track actions for logging
  250. actions = []
  251. # Only Super Admin can change mobile
  252. if "mobile" in update_data:
  253. if current_user.role != "SUPER_ADMIN":
  254. del update_data["mobile"]
  255. else:
  256. # Require admin password for mobile change
  257. if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash):
  258. logger.warning(f"修改用户手机号失败: 管理员密码错误 (Target: {user.mobile})")
  259. raise HTTPException(status_code=403, detail="管理员密码错误")
  260. # Check uniqueness
  261. existing_user = db.query(User).filter(User.mobile == update_data["mobile"]).first()
  262. if existing_user and existing_user.id != user_id:
  263. raise HTTPException(status_code=400, detail="该手机号已存在")
  264. if user.mobile != update_data["mobile"]:
  265. old_mobile = user.mobile
  266. new_mobile = update_data["mobile"]
  267. # Update Mappings first (logic: user mobile change should propagate to mappings)
  268. # Find mappings where mapped_key matches old_mobile
  269. mappings = db.query(AppUserMapping).filter(
  270. AppUserMapping.user_id == user.id,
  271. AppUserMapping.mapped_key == old_mobile
  272. ).all()
  273. for mapping in mappings:
  274. mapping.mapped_key = new_mobile
  275. db.add(mapping)
  276. try:
  277. db.flush()
  278. except IntegrityError:
  279. db.rollback()
  280. logger.error(f"修改手机号失败: 映射冲突 ({old_mobile} -> {new_mobile})")
  281. raise HTTPException(status_code=400, detail="修改失败:新手机号在某些应用中已存在映射关联")
  282. actions.append((ActionType.UPDATE, {"field": "mobile", "old": old_mobile, "new": new_mobile}))
  283. # Only Super Admin can change status or role
  284. if "status" in update_data:
  285. if current_user.role != "SUPER_ADMIN":
  286. del update_data["status"]
  287. elif update_data["status"] == "DISABLED" and user_id == current_user.id:
  288. raise HTTPException(status_code=400, detail="超级管理员不能禁用自己")
  289. else:
  290. # Require admin password for status change
  291. if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash):
  292. logger.warning(f"修改用户状态失败: 密码错误")
  293. raise HTTPException(status_code=403, detail="管理员密码错误")
  294. # Add Log Action
  295. action_type = ActionType.DISABLE if update_data["status"] == "DISABLED" else ActionType.ENABLE
  296. actions.append((action_type, {"old": user.status, "new": update_data["status"]}))
  297. if "role" in update_data:
  298. if current_user.role != "SUPER_ADMIN":
  299. del update_data["role"]
  300. elif user_id == current_user.id and update_data["role"] != "SUPER_ADMIN":
  301. # Prevent demoting self
  302. raise HTTPException(status_code=400, detail="超级管理员不能降级自己")
  303. else:
  304. # Require admin password for role change
  305. if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash):
  306. logger.warning(f"修改用户角色失败: 密码错误")
  307. raise HTTPException(status_code=403, detail="管理员密码错误")
  308. actions.append((ActionType.CHANGE_ROLE, {"old": user.role, "new": update_data["role"]}))
  309. if "admin_password" in update_data:
  310. del update_data["admin_password"]
  311. # 删除用户请使用 POST /users/{id}/delete(密码+短信),不再接受通过更新接口软删除
  312. if "is_deleted" in update_data:
  313. del update_data["is_deleted"]
  314. if "password" in update_data:
  315. password = update_data["password"]
  316. if password:
  317. password = password.strip()
  318. hashed_password = security.get_password_hash(password)
  319. del update_data["password"]
  320. user.password_hash = hashed_password
  321. # Auto-generate english_name if name changed and english_name not provided
  322. if "name" in update_data and update_data["name"]:
  323. update_data["name"] = update_data["name"].replace(" ", "")
  324. if "english_name" not in update_data or not update_data["english_name"]:
  325. update_data["english_name"] = generate_english_name(update_data["name"])
  326. for field, value in update_data.items():
  327. setattr(user, field, value)
  328. db.add(user)
  329. # 用户禁用时关闭其全部应用映射;重新启用(含审核通过)时恢复为启用
  330. if "status" in update_data:
  331. if update_data["status"] == "DISABLED":
  332. n = (
  333. db.query(AppUserMapping)
  334. .filter(AppUserMapping.user_id == user.id)
  335. .update({"is_active": False}, synchronize_session=False)
  336. )
  337. if n:
  338. logger.info(f"用户禁用:已关闭 {n} 条应用映射 (user_id={user.id})")
  339. elif update_data["status"] == "ACTIVE":
  340. n = (
  341. db.query(AppUserMapping)
  342. .filter(AppUserMapping.user_id == user.id)
  343. .update({"is_active": True}, synchronize_session=False)
  344. )
  345. if n:
  346. logger.info(f"用户启用:已恢复 {n} 条应用映射 (user_id={user.id})")
  347. db.commit()
  348. db.refresh(user)
  349. # Trigger Webhook
  350. event_type = "UPDATE"
  351. if user.status == "DISABLED":
  352. event_type = "DISABLE"
  353. background_tasks.add_task(WebhookService.trigger_user_event, db, user.id, event_type)
  354. # Record Logs
  355. if current_user.role == "SUPER_ADMIN" and actions:
  356. for action_type, details in actions:
  357. LogService.create_log(
  358. db=db,
  359. operator_id=current_user.id,
  360. action_type=action_type,
  361. target_user_id=user.id,
  362. target_mobile=user.mobile,
  363. ip_address=get_client_ip(request),
  364. details=details
  365. )
  366. logger.info(f"更新用户信息成功: {user.mobile} (ID: {user.id})")
  367. return user
  368. @router.post("/{user_id}/promote", response_model=UserSchema, summary="提升用户权限")
  369. def promote_user(
  370. *,
  371. db: Session = Depends(deps.get_db),
  372. user_id: int,
  373. req: PromoteUserRequest,
  374. request: Request,
  375. current_user: User = Depends(deps.get_current_active_user),
  376. ):
  377. """
  378. 将开发者提升为超级管理员。
  379. 需要验证(管理员密码 + 验证码)。
  380. """
  381. if current_user.role != "SUPER_ADMIN":
  382. raise HTTPException(status_code=403, detail="权限不足")
  383. # 1. Verify Password
  384. if not security.verify_password(req.password, current_user.password_hash):
  385. logger.warning(f"提升权限失败: 密码错误 (Admin: {current_user.mobile})")
  386. raise HTTPException(status_code=403, detail="密码错误")
  387. # 2. Verify Captcha
  388. if not CaptchaService.verify_captcha(req.captcha_id, req.captcha_code):
  389. logger.warning(f"提升权限失败: 验证码错误")
  390. raise HTTPException(status_code=400, detail="验证码错误")
  391. user = db.query(User).filter(User.id == user_id).first()
  392. if not user:
  393. raise HTTPException(status_code=404, detail="用户不存在")
  394. old_role = user.role
  395. user.role = "SUPER_ADMIN"
  396. user.status = "ACTIVE" # Ensure they are active
  397. db.add(user)
  398. db.commit()
  399. db.refresh(user)
  400. # Log Operation
  401. LogService.create_log(
  402. db=db,
  403. operator_id=current_user.id,
  404. action_type=ActionType.CHANGE_ROLE,
  405. target_user_id=user.id,
  406. target_mobile=user.mobile,
  407. ip_address=get_client_ip(request),
  408. details={"old": old_role, "new": "SUPER_ADMIN"}
  409. )
  410. logger.info(f"提升用户为超管: {user.mobile}")
  411. return user
  412. @router.post("/{user_id}/delete", response_model=UserSchema, summary="删除普通用户(软删除)")
  413. def delete_user(
  414. *,
  415. db: Session = Depends(deps.get_db),
  416. user_id: int,
  417. body: DeleteUserRequest,
  418. request: Request,
  419. current_user: User = Depends(deps.get_current_active_user),
  420. background_tasks: BackgroundTasks,
  421. ):
  422. """
  423. 软删除普通用户(ORDINARY_USER)。仅限超级管理员。
  424. 需验证操作者登录密码与发送至操作者手机的短信验证码。不提供用户自助销户。
  425. """
  426. if current_user.role != "SUPER_ADMIN":
  427. raise HTTPException(status_code=403, detail="权限不足")
  428. if user_id == current_user.id:
  429. raise HTTPException(status_code=400, detail="不能删除自己")
  430. user = db.query(User).filter(User.id == user_id).first()
  431. if not user or user.is_deleted != 0:
  432. raise HTTPException(status_code=404, detail="用户不存在")
  433. # 管理员与开发者账号不可删除(仅允许删除普通用户)
  434. if user.role in (UserRole.SUPER_ADMIN, "SUPER_ADMIN"):
  435. raise HTTPException(status_code=403, detail="不能删除超级管理员")
  436. if user.role in (UserRole.DEVELOPER, "DEVELOPER"):
  437. raise HTTPException(status_code=403, detail="不能删除开发者")
  438. if user.role not in (UserRole.ORDINARY_USER, "ORDINARY_USER"):
  439. raise HTTPException(status_code=400, detail="仅可删除普通用户")
  440. if not security.verify_password(body.password, current_user.password_hash):
  441. logger.warning(f"删除用户失败: 密码错误 (Admin: {current_user.mobile})")
  442. raise HTTPException(status_code=403, detail="密码错误")
  443. if not SmsService.verify_code(current_user.mobile, body.sms_code):
  444. logger.warning(f"删除用户失败: 短信验证码错误 (Admin: {current_user.mobile})")
  445. raise HTTPException(status_code=400, detail="验证码错误或已过期")
  446. user.is_deleted = 1
  447. user.status = "DISABLED"
  448. db.add(user)
  449. db.commit()
  450. background_tasks.add_task(WebhookService.trigger_user_event, db, user.id, "DELETE")
  451. LogService.create_log(
  452. db=db,
  453. operator_id=current_user.id,
  454. action_type=ActionType.DELETE,
  455. target_user_id=user.id,
  456. target_mobile=user.mobile,
  457. ip_address=get_client_ip(request),
  458. details={"status": "DISABLED"}
  459. )
  460. logger.info(f"删除用户成功: {user.mobile}")
  461. return user
  462. @router.get("/me", response_model=UserSchema, summary="获取当前用户信息")
  463. def read_user_me(
  464. current_user: User = Depends(deps.get_current_active_user),
  465. ):
  466. """
  467. 获取当前登录用户的信息。
  468. """
  469. return current_user
  470. @router.get("/{user_id}", response_model=UserSchema, summary="根据ID获取用户")
  471. def read_user_by_id(
  472. user_id: int,
  473. current_user: User = Depends(deps.get_current_active_user),
  474. db: Session = Depends(deps.get_db),
  475. ):
  476. """
  477. 根据用户ID获取特定用户信息。
  478. """
  479. user = db.query(User).filter(User.id == user_id).first()
  480. if not user:
  481. raise HTTPException(
  482. status_code=404,
  483. detail="该用户ID在系统中不存在",
  484. )
  485. return user