users.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. from typing import List, Any
  2. import logging
  3. from fastapi import APIRouter, Depends, HTTPException, Body, BackgroundTasks, Request, Query
  4. from sqlalchemy.orm import Session
  5. from sqlalchemy import or_
  6. from sqlalchemy.exc import IntegrityError
  7. from app.api.v1 import deps
  8. from app.core import security
  9. from app.core.utils import generate_english_name, get_client_ip
  10. from app.models.user import User, UserRole
  11. from app.models.mapping import AppUserMapping
  12. from app.schemas.user import User as UserSchema, UserCreate, UserUpdate, UserList, PromoteUserRequest, BatchResetEnglishNameRequest
  13. from app.services.webhook_service import WebhookService
  14. from app.services.captcha_service import CaptchaService
  15. from app.services.log_service import LogService
  16. from app.schemas.operation_log import ActionType
  17. router = APIRouter()
  18. logger = logging.getLogger(__name__)
  19. @router.get("/search", response_model=List[UserSchema], summary="搜索用户")
  20. def search_users(
  21. keyword: str = Query(None, alias="q"),
  22. limit: int = 20,
  23. db: Session = Depends(deps.get_db),
  24. current_user: User = Depends(deps.get_current_active_user),
  25. ):
  26. """
  27. 搜索用户(支持姓名、手机号、英文名)。
  28. """
  29. query = db.query(User).filter(
  30. User.is_deleted == 0,
  31. User.status == "ACTIVE"
  32. )
  33. if keyword:
  34. query = query.filter(
  35. or_(
  36. User.mobile.ilike(f"%{keyword}%"),
  37. User.name.ilike(f"%{keyword}%"),
  38. User.english_name.ilike(f"%{keyword}%")
  39. )
  40. )
  41. # Exclude self
  42. query = query.filter(User.id != current_user.id)
  43. users = query.limit(limit).all()
  44. return users
  45. @router.get("/", response_model=UserList, summary="获取用户列表")
  46. def read_users(
  47. skip: int = 0,
  48. limit: int = 10,
  49. status: str = None,
  50. role: str = None,
  51. mobile: str = None,
  52. name: str = None,
  53. english_name: str = None,
  54. keyword: str = None,
  55. db: Session = Depends(deps.get_db),
  56. current_user: User = Depends(deps.get_current_active_user),
  57. ):
  58. """
  59. 获取用户列表。只有超级管理员可以查看所有用户。
  60. 支持通过 mobile, name, english_name 精确/模糊筛选,
  61. 也支持通过 keyword 进行跨字段模糊搜索。
  62. """
  63. if current_user.role != "SUPER_ADMIN":
  64. raise HTTPException(status_code=403, detail="权限不足")
  65. # Filter out soft-deleted users
  66. query = db.query(User).filter(User.is_deleted == 0)
  67. if status:
  68. query = query.filter(User.status == status)
  69. if role:
  70. query = query.filter(User.role == role)
  71. # Specific field filters (AND logic)
  72. if mobile:
  73. query = query.filter(User.mobile.ilike(f"%{mobile}%"))
  74. if name:
  75. query = query.filter(User.name.ilike(f"%{name}%"))
  76. if english_name:
  77. query = query.filter(User.english_name.ilike(f"%{english_name}%"))
  78. # Keyword search (OR logic across fields) - intersects with previous filters
  79. if keyword:
  80. query = query.filter(
  81. or_(
  82. User.mobile.ilike(f"%{keyword}%"),
  83. User.name.ilike(f"%{keyword}%"),
  84. User.english_name.ilike(f"%{keyword}%")
  85. )
  86. )
  87. total = query.count()
  88. users = query.order_by(User.id.desc()).offset(skip).limit(limit).all()
  89. return {"total": total, "items": users}
  90. @router.post("/", response_model=UserSchema, summary="创建用户")
  91. def create_user(
  92. *,
  93. db: Session = Depends(deps.get_db),
  94. user_in: UserCreate,
  95. request: Request,
  96. background_tasks: BackgroundTasks,
  97. current_user: User = Depends(deps.get_current_active_user),
  98. ):
  99. """
  100. 创建新用户。只有超级管理员可以直接创建用户。
  101. 公开注册请使用 /open/register。
  102. """
  103. if current_user.role != "SUPER_ADMIN":
  104. raise HTTPException(status_code=403, detail="权限不足")
  105. # Verify Admin Password
  106. if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash):
  107. logger.warning(f"管理员创建用户失败: 密码错误 (Admin: {current_user.mobile})")
  108. raise HTTPException(status_code=403, detail="管理员密码错误")
  109. user = db.query(User).filter(User.mobile == user_in.mobile).first()
  110. if user:
  111. raise HTTPException(
  112. status_code=400,
  113. detail="该手机号已在系统中注册",
  114. )
  115. if user_in.password:
  116. user_in.password = user_in.password.strip()
  117. # Remove spaces from name
  118. if user_in.name:
  119. user_in.name = user_in.name.replace(" ", "")
  120. # Generate default name if missing
  121. if not user_in.name:
  122. random_suffix = security.generate_alphanumeric_password(6)
  123. user_in.name = f"用户{random_suffix}"
  124. # Ensure unique name
  125. if user_in.name:
  126. original_name = user_in.name
  127. counter = 1
  128. while db.query(User).filter(User.name == user_in.name, User.is_deleted == 0).first():
  129. user_in.name = f"{original_name}{counter}"
  130. counter += 1
  131. english_name = user_in.english_name
  132. if not english_name and user_in.name:
  133. english_name = generate_english_name(user_in.name)
  134. # Ensure unique english_name
  135. if english_name:
  136. original_english_name = english_name
  137. counter = 1
  138. while db.query(User).filter(User.english_name == english_name, User.is_deleted == 0).first():
  139. english_name = f"{original_english_name}{counter}"
  140. counter += 1
  141. db_user = User(
  142. mobile=user_in.mobile,
  143. name=user_in.name,
  144. english_name=english_name,
  145. password_hash=security.get_password_hash(user_in.password),
  146. status="ACTIVE", # Admin created users are active by default
  147. role=user_in.role or UserRole.ORDINARY_USER
  148. )
  149. db.add(db_user)
  150. db.commit()
  151. db.refresh(db_user)
  152. # Log Operation
  153. LogService.create_log(
  154. db=db,
  155. operator_id=current_user.id,
  156. action_type=ActionType.MANUAL_ADD,
  157. target_user_id=db_user.id,
  158. target_mobile=db_user.mobile,
  159. ip_address=get_client_ip(request),
  160. details={"role": db_user.role}
  161. )
  162. logger.info(f"管理员创建用户成功: {db_user.mobile} (Role: {db_user.role})")
  163. return db_user
  164. @router.post("/batch/reset-english-name", summary="批量重置用户英文名")
  165. def batch_reset_english_name(
  166. *,
  167. db: Session = Depends(deps.get_db),
  168. req: BatchResetEnglishNameRequest,
  169. request: Request,
  170. current_user: User = Depends(deps.get_current_active_user),
  171. ):
  172. """
  173. 批量重置选中用户的英文名。
  174. 规则:根据姓名生成拼音,如有重复自动追加数字后缀。
  175. 需要管理员密码验证。
  176. """
  177. if current_user.role != "SUPER_ADMIN":
  178. raise HTTPException(status_code=403, detail="权限不足")
  179. # Verify Admin Password
  180. if not security.verify_password(req.admin_password, current_user.password_hash):
  181. logger.warning(f"批量重置英文名失败: 密码错误 (Admin: {current_user.mobile})")
  182. raise HTTPException(status_code=403, detail="管理员密码错误")
  183. if not req.user_ids:
  184. raise HTTPException(status_code=400, detail="请选择用户")
  185. success_count = 0
  186. users = db.query(User).filter(User.id.in_(req.user_ids), User.is_deleted == 0).all()
  187. for user in users:
  188. if not user.name:
  189. continue
  190. old_english_name = user.english_name
  191. new_english_name = generate_english_name(user.name)
  192. # Uniqueness check
  193. if new_english_name:
  194. original_base = new_english_name
  195. counter = 1
  196. # Helper to check existence
  197. def check_exists(name, current_id):
  198. return db.query(User).filter(
  199. User.english_name == name,
  200. User.is_deleted == 0,
  201. User.id != current_id
  202. ).first()
  203. while check_exists(new_english_name, user.id):
  204. new_english_name = f"{original_base}{counter}"
  205. counter += 1
  206. if old_english_name != new_english_name:
  207. user.english_name = new_english_name
  208. db.add(user)
  209. # Log
  210. LogService.create_log(
  211. db=db,
  212. operator_id=current_user.id,
  213. action_type=ActionType.UPDATE,
  214. target_user_id=user.id,
  215. target_mobile=user.mobile,
  216. ip_address=get_client_ip(request),
  217. details={
  218. "field": "english_name",
  219. "old": old_english_name,
  220. "new": new_english_name,
  221. "reason": "BATCH_RESET"
  222. }
  223. )
  224. success_count += 1
  225. db.commit()
  226. logger.info(f"批量重置英文名完成: 成功 {success_count} 个 (Admin: {current_user.mobile})")
  227. return {"success": True, "count": success_count}
  228. @router.put("/{user_id}", response_model=UserSchema, summary="更新用户")
  229. def update_user(
  230. *,
  231. db: Session = Depends(deps.get_db),
  232. user_id: int,
  233. user_in: UserUpdate,
  234. request: Request,
  235. background_tasks: BackgroundTasks,
  236. current_user: User = Depends(deps.get_current_active_user),
  237. ):
  238. """
  239. 更新用户信息。超级管理员可以更新任何人,用户只能更新自己。
  240. """
  241. user = db.query(User).filter(User.id == user_id).first()
  242. if not user:
  243. raise HTTPException(status_code=404, detail="用户不存在")
  244. # Permission Check
  245. if current_user.role != "SUPER_ADMIN" and current_user.id != user_id:
  246. raise HTTPException(status_code=403, detail="权限不足")
  247. update_data = user_in.model_dump(exclude_unset=True)
  248. # Track actions for logging
  249. actions = []
  250. # Only Super Admin can change mobile
  251. if "mobile" in update_data:
  252. if current_user.role != "SUPER_ADMIN":
  253. del update_data["mobile"]
  254. else:
  255. # Require admin password for mobile change
  256. if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash):
  257. logger.warning(f"修改用户手机号失败: 管理员密码错误 (Target: {user.mobile})")
  258. raise HTTPException(status_code=403, detail="管理员密码错误")
  259. # Check uniqueness
  260. existing_user = db.query(User).filter(User.mobile == update_data["mobile"]).first()
  261. if existing_user and existing_user.id != user_id:
  262. raise HTTPException(status_code=400, detail="该手机号已存在")
  263. if user.mobile != update_data["mobile"]:
  264. old_mobile = user.mobile
  265. new_mobile = update_data["mobile"]
  266. # Update Mappings first (logic: user mobile change should propagate to mappings)
  267. # Find mappings where mapped_key matches old_mobile
  268. mappings = db.query(AppUserMapping).filter(
  269. AppUserMapping.user_id == user.id,
  270. AppUserMapping.mapped_key == old_mobile
  271. ).all()
  272. for mapping in mappings:
  273. mapping.mapped_key = new_mobile
  274. db.add(mapping)
  275. try:
  276. db.flush()
  277. except IntegrityError:
  278. db.rollback()
  279. logger.error(f"修改手机号失败: 映射冲突 ({old_mobile} -> {new_mobile})")
  280. raise HTTPException(status_code=400, detail="修改失败:新手机号在某些应用中已存在映射关联")
  281. actions.append((ActionType.UPDATE, {"field": "mobile", "old": old_mobile, "new": new_mobile}))
  282. # Only Super Admin can change status or role
  283. if "status" in update_data:
  284. if current_user.role != "SUPER_ADMIN":
  285. del update_data["status"]
  286. elif update_data["status"] == "DISABLED" and user_id == current_user.id:
  287. raise HTTPException(status_code=400, detail="超级管理员不能禁用自己")
  288. else:
  289. # Require admin password for status change
  290. if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash):
  291. logger.warning(f"修改用户状态失败: 密码错误")
  292. raise HTTPException(status_code=403, detail="管理员密码错误")
  293. # Add Log Action
  294. action_type = ActionType.DISABLE if update_data["status"] == "DISABLED" else ActionType.ENABLE
  295. actions.append((action_type, {"old": user.status, "new": update_data["status"]}))
  296. if "role" in update_data:
  297. if current_user.role != "SUPER_ADMIN":
  298. del update_data["role"]
  299. elif user_id == current_user.id and update_data["role"] != "SUPER_ADMIN":
  300. # Prevent demoting self
  301. raise HTTPException(status_code=400, detail="超级管理员不能降级自己")
  302. else:
  303. # Require admin password for role change
  304. if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash):
  305. logger.warning(f"修改用户角色失败: 密码错误")
  306. raise HTTPException(status_code=403, detail="管理员密码错误")
  307. actions.append((ActionType.CHANGE_ROLE, {"old": user.role, "new": update_data["role"]}))
  308. if "admin_password" in update_data:
  309. del update_data["admin_password"]
  310. if "is_deleted" in update_data:
  311. if current_user.role != "SUPER_ADMIN":
  312. del update_data["is_deleted"]
  313. elif update_data["is_deleted"] == 1 and user_id == current_user.id:
  314. raise HTTPException(status_code=400, detail="超级管理员不能删除自己")
  315. elif update_data["is_deleted"] == 1:
  316. actions.append((ActionType.DELETE, {}))
  317. if "password" in update_data:
  318. password = update_data["password"]
  319. if password:
  320. password = password.strip()
  321. hashed_password = security.get_password_hash(password)
  322. del update_data["password"]
  323. user.password_hash = hashed_password
  324. # Auto-generate english_name if name changed and english_name not provided
  325. if "name" in update_data and update_data["name"]:
  326. update_data["name"] = update_data["name"].replace(" ", "")
  327. if "english_name" not in update_data or not update_data["english_name"]:
  328. update_data["english_name"] = generate_english_name(update_data["name"])
  329. for field, value in update_data.items():
  330. setattr(user, field, value)
  331. db.add(user)
  332. db.commit()
  333. db.refresh(user)
  334. # Trigger Webhook
  335. event_type = "UPDATE"
  336. if user.status == "DISABLED":
  337. event_type = "DISABLE"
  338. background_tasks.add_task(WebhookService.trigger_user_event, db, user.id, event_type)
  339. # Record Logs
  340. if current_user.role == "SUPER_ADMIN" and actions:
  341. for action_type, details in actions:
  342. LogService.create_log(
  343. db=db,
  344. operator_id=current_user.id,
  345. action_type=action_type,
  346. target_user_id=user.id,
  347. target_mobile=user.mobile,
  348. ip_address=get_client_ip(request),
  349. details=details
  350. )
  351. logger.info(f"更新用户信息成功: {user.mobile} (ID: {user.id})")
  352. return user
  353. @router.post("/{user_id}/promote", response_model=UserSchema, summary="提升用户权限")
  354. def promote_user(
  355. *,
  356. db: Session = Depends(deps.get_db),
  357. user_id: int,
  358. req: PromoteUserRequest,
  359. request: Request,
  360. current_user: User = Depends(deps.get_current_active_user),
  361. ):
  362. """
  363. 将开发者提升为超级管理员。
  364. 需要验证(管理员密码 + 验证码)。
  365. """
  366. if current_user.role != "SUPER_ADMIN":
  367. raise HTTPException(status_code=403, detail="权限不足")
  368. # 1. Verify Password
  369. if not security.verify_password(req.password, current_user.password_hash):
  370. logger.warning(f"提升权限失败: 密码错误 (Admin: {current_user.mobile})")
  371. raise HTTPException(status_code=403, detail="密码错误")
  372. # 2. Verify Captcha
  373. if not CaptchaService.verify_captcha(req.captcha_id, req.captcha_code):
  374. logger.warning(f"提升权限失败: 验证码错误")
  375. raise HTTPException(status_code=400, detail="验证码错误")
  376. user = db.query(User).filter(User.id == user_id).first()
  377. if not user:
  378. raise HTTPException(status_code=404, detail="用户不存在")
  379. old_role = user.role
  380. user.role = "SUPER_ADMIN"
  381. user.status = "ACTIVE" # Ensure they are active
  382. db.add(user)
  383. db.commit()
  384. db.refresh(user)
  385. # Log Operation
  386. LogService.create_log(
  387. db=db,
  388. operator_id=current_user.id,
  389. action_type=ActionType.CHANGE_ROLE,
  390. target_user_id=user.id,
  391. target_mobile=user.mobile,
  392. ip_address=get_client_ip(request),
  393. details={"old": old_role, "new": "SUPER_ADMIN"}
  394. )
  395. logger.info(f"提升用户为超管: {user.mobile}")
  396. return user
  397. @router.delete("/{user_id}", response_model=UserSchema, summary="删除用户")
  398. def delete_user(
  399. *,
  400. db: Session = Depends(deps.get_db),
  401. user_id: int,
  402. request: Request,
  403. current_user: User = Depends(deps.get_current_active_user),
  404. background_tasks: BackgroundTasks,
  405. ):
  406. """
  407. 软删除用户。仅限超级管理员。
  408. """
  409. if current_user.role != "SUPER_ADMIN":
  410. raise HTTPException(status_code=403, detail="权限不足")
  411. if user_id == current_user.id:
  412. raise HTTPException(status_code=400, detail="超级管理员不能删除自己")
  413. user = db.query(User).filter(User.id == user_id).first()
  414. if not user:
  415. raise HTTPException(status_code=404, detail="用户不存在")
  416. user.is_deleted = 1 # Using Integer 1 for True
  417. user.status = "DISABLED" # Also disable login
  418. db.add(user)
  419. db.commit()
  420. background_tasks.add_task(WebhookService.trigger_user_event, db, user.id, "DELETE")
  421. # Log Operation
  422. LogService.create_log(
  423. db=db,
  424. operator_id=current_user.id,
  425. action_type=ActionType.DELETE,
  426. target_user_id=user.id,
  427. target_mobile=user.mobile,
  428. ip_address=get_client_ip(request),
  429. details={"status": "DISABLED"}
  430. )
  431. logger.info(f"删除用户成功: {user.mobile}")
  432. return user
  433. @router.get("/me", response_model=UserSchema, summary="获取当前用户信息")
  434. def read_user_me(
  435. current_user: User = Depends(deps.get_current_active_user),
  436. ):
  437. """
  438. 获取当前登录用户的信息。
  439. """
  440. return current_user
  441. @router.get("/{user_id}", response_model=UserSchema, summary="根据ID获取用户")
  442. def read_user_by_id(
  443. user_id: int,
  444. current_user: User = Depends(deps.get_current_active_user),
  445. db: Session = Depends(deps.get_db),
  446. ):
  447. """
  448. 根据用户ID获取特定用户信息。
  449. """
  450. user = db.query(User).filter(User.id == user_id).first()
  451. if not user:
  452. raise HTTPException(
  453. status_code=404,
  454. detail="该用户ID在系统中不存在",
  455. )
  456. return user