users.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. from typing import List, Any
  2. import logging
  3. from fastapi import APIRouter, Depends, HTTPException, Body, BackgroundTasks, Request, Query
  4. from sqlalchemy.orm import Session
  5. from sqlalchemy import or_
  6. from sqlalchemy.exc import IntegrityError
  7. from app.api.v1 import deps
  8. from app.core import security
  9. from app.core.utils import generate_english_name, get_client_ip
  10. from app.models.user import User, UserRole
  11. from app.models.mapping import AppUserMapping
  12. from app.schemas.user import User as UserSchema, UserCreate, UserUpdate, UserList, PromoteUserRequest, BatchResetEnglishNameRequest
  13. from app.services.webhook_service import WebhookService
  14. from app.services.captcha_service import CaptchaService
  15. from app.services.log_service import LogService
  16. from app.schemas.operation_log import ActionType
  17. router = APIRouter()
  18. logger = logging.getLogger(__name__)
  19. @router.get("/search", response_model=List[UserSchema], summary="搜索用户")
  20. def search_users(
  21. keyword: str = Query(None, alias="q"),
  22. limit: int = 20,
  23. db: Session = Depends(deps.get_db),
  24. current_subject: deps.AuthSubject = Depends(deps.get_current_user_or_app_by_api_key),
  25. ):
  26. """
  27. 搜索用户(支持姓名、手机号、英文名)。
  28. 支持用户 JWT Token 和应用 API Key (X-App-Access-Token) 两种认证方式。
  29. """
  30. query = db.query(User).filter(
  31. User.is_deleted == 0,
  32. User.status == "ACTIVE"
  33. )
  34. if keyword:
  35. query = query.filter(
  36. or_(
  37. User.mobile.ilike(f"%{keyword}%"),
  38. User.name.ilike(f"%{keyword}%"),
  39. User.english_name.ilike(f"%{keyword}%")
  40. )
  41. )
  42. # 如果是用户认证,排除自己;如果是应用认证,不排除
  43. if isinstance(current_subject, User):
  44. query = query.filter(User.id != current_subject.id)
  45. users = query.limit(limit).all()
  46. return users
  47. @router.get("/", response_model=UserList, summary="获取用户列表")
  48. def read_users(
  49. skip: int = 0,
  50. limit: int = 10,
  51. status: str = None,
  52. role: str = None,
  53. mobile: str = None,
  54. name: str = None,
  55. english_name: str = None,
  56. keyword: str = None,
  57. db: Session = Depends(deps.get_db),
  58. current_user: User = Depends(deps.get_current_active_user),
  59. ):
  60. """
  61. 获取用户列表。只有超级管理员可以查看所有用户。
  62. 支持通过 mobile, name, english_name 精确/模糊筛选,
  63. 也支持通过 keyword 进行跨字段模糊搜索。
  64. """
  65. if current_user.role != "SUPER_ADMIN":
  66. raise HTTPException(status_code=403, detail="权限不足")
  67. # Filter out soft-deleted users
  68. query = db.query(User).filter(User.is_deleted == 0)
  69. if status:
  70. query = query.filter(User.status == status)
  71. if role:
  72. query = query.filter(User.role == role)
  73. # Specific field filters (AND logic)
  74. if mobile:
  75. query = query.filter(User.mobile.ilike(f"%{mobile}%"))
  76. if name:
  77. query = query.filter(User.name.ilike(f"%{name}%"))
  78. if english_name:
  79. query = query.filter(User.english_name.ilike(f"%{english_name}%"))
  80. # Keyword search (OR logic across fields) - intersects with previous filters
  81. if keyword:
  82. query = query.filter(
  83. or_(
  84. User.mobile.ilike(f"%{keyword}%"),
  85. User.name.ilike(f"%{keyword}%"),
  86. User.english_name.ilike(f"%{keyword}%")
  87. )
  88. )
  89. total = query.count()
  90. users = query.order_by(User.id.desc()).offset(skip).limit(limit).all()
  91. return {"total": total, "items": users}
  92. @router.post("/", response_model=UserSchema, summary="创建用户")
  93. def create_user(
  94. *,
  95. db: Session = Depends(deps.get_db),
  96. user_in: UserCreate,
  97. request: Request,
  98. background_tasks: BackgroundTasks,
  99. current_user: User = Depends(deps.get_current_active_user),
  100. ):
  101. """
  102. 创建新用户。只有超级管理员可以直接创建用户。
  103. 公开注册请使用 /open/register。
  104. """
  105. if current_user.role != "SUPER_ADMIN":
  106. raise HTTPException(status_code=403, detail="权限不足")
  107. # Verify Admin Password
  108. if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash):
  109. logger.warning(f"管理员创建用户失败: 密码错误 (Admin: {current_user.mobile})")
  110. raise HTTPException(status_code=403, detail="管理员密码错误")
  111. user = db.query(User).filter(User.mobile == user_in.mobile).first()
  112. if user:
  113. raise HTTPException(
  114. status_code=400,
  115. detail="该手机号已在系统中注册",
  116. )
  117. if user_in.password:
  118. user_in.password = user_in.password.strip()
  119. # Remove spaces from name
  120. if user_in.name:
  121. user_in.name = user_in.name.replace(" ", "")
  122. # Generate default name if missing
  123. if not user_in.name:
  124. random_suffix = security.generate_alphanumeric_password(6)
  125. user_in.name = f"用户{random_suffix}"
  126. # Ensure unique name
  127. if user_in.name:
  128. original_name = user_in.name
  129. counter = 1
  130. while db.query(User).filter(User.name == user_in.name, User.is_deleted == 0).first():
  131. user_in.name = f"{original_name}{counter}"
  132. counter += 1
  133. english_name = user_in.english_name
  134. if not english_name and user_in.name:
  135. english_name = generate_english_name(user_in.name)
  136. # Ensure unique english_name
  137. if english_name:
  138. original_english_name = english_name
  139. counter = 1
  140. while db.query(User).filter(User.english_name == english_name, User.is_deleted == 0).first():
  141. english_name = f"{original_english_name}{counter}"
  142. counter += 1
  143. db_user = User(
  144. mobile=user_in.mobile,
  145. name=user_in.name,
  146. english_name=english_name,
  147. password_hash=security.get_password_hash(user_in.password),
  148. status="ACTIVE", # Admin created users are active by default
  149. role=user_in.role or UserRole.ORDINARY_USER
  150. )
  151. db.add(db_user)
  152. db.commit()
  153. db.refresh(db_user)
  154. # Log Operation
  155. LogService.create_log(
  156. db=db,
  157. operator_id=current_user.id,
  158. action_type=ActionType.MANUAL_ADD,
  159. target_user_id=db_user.id,
  160. target_mobile=db_user.mobile,
  161. ip_address=get_client_ip(request),
  162. details={"role": db_user.role}
  163. )
  164. logger.info(f"管理员创建用户成功: {db_user.mobile} (Role: {db_user.role})")
  165. return db_user
  166. @router.post("/batch/reset-english-name", summary="批量重置用户英文名")
  167. def batch_reset_english_name(
  168. *,
  169. db: Session = Depends(deps.get_db),
  170. req: BatchResetEnglishNameRequest,
  171. request: Request,
  172. current_user: User = Depends(deps.get_current_active_user),
  173. ):
  174. """
  175. 批量重置选中用户的英文名。
  176. 规则:根据姓名生成拼音,如有重复自动追加数字后缀。
  177. 需要管理员密码验证。
  178. """
  179. if current_user.role != "SUPER_ADMIN":
  180. raise HTTPException(status_code=403, detail="权限不足")
  181. # Verify Admin Password
  182. if not security.verify_password(req.admin_password, current_user.password_hash):
  183. logger.warning(f"批量重置英文名失败: 密码错误 (Admin: {current_user.mobile})")
  184. raise HTTPException(status_code=403, detail="管理员密码错误")
  185. if not req.user_ids:
  186. raise HTTPException(status_code=400, detail="请选择用户")
  187. success_count = 0
  188. users = db.query(User).filter(User.id.in_(req.user_ids), User.is_deleted == 0).all()
  189. for user in users:
  190. if not user.name:
  191. continue
  192. old_english_name = user.english_name
  193. new_english_name = generate_english_name(user.name)
  194. # Uniqueness check
  195. if new_english_name:
  196. original_base = new_english_name
  197. counter = 1
  198. # Helper to check existence
  199. def check_exists(name, current_id):
  200. return db.query(User).filter(
  201. User.english_name == name,
  202. User.is_deleted == 0,
  203. User.id != current_id
  204. ).first()
  205. while check_exists(new_english_name, user.id):
  206. new_english_name = f"{original_base}{counter}"
  207. counter += 1
  208. if old_english_name != new_english_name:
  209. user.english_name = new_english_name
  210. db.add(user)
  211. # Log
  212. LogService.create_log(
  213. db=db,
  214. operator_id=current_user.id,
  215. action_type=ActionType.UPDATE,
  216. target_user_id=user.id,
  217. target_mobile=user.mobile,
  218. ip_address=get_client_ip(request),
  219. details={
  220. "field": "english_name",
  221. "old": old_english_name,
  222. "new": new_english_name,
  223. "reason": "BATCH_RESET"
  224. }
  225. )
  226. success_count += 1
  227. db.commit()
  228. logger.info(f"批量重置英文名完成: 成功 {success_count} 个 (Admin: {current_user.mobile})")
  229. return {"success": True, "count": success_count}
  230. @router.put("/{user_id}", response_model=UserSchema, summary="更新用户")
  231. def update_user(
  232. *,
  233. db: Session = Depends(deps.get_db),
  234. user_id: int,
  235. user_in: UserUpdate,
  236. request: Request,
  237. background_tasks: BackgroundTasks,
  238. current_user: User = Depends(deps.get_current_active_user),
  239. ):
  240. """
  241. 更新用户信息。超级管理员可以更新任何人,用户只能更新自己。
  242. """
  243. user = db.query(User).filter(User.id == user_id).first()
  244. if not user:
  245. raise HTTPException(status_code=404, detail="用户不存在")
  246. # Permission Check
  247. if current_user.role != "SUPER_ADMIN" and current_user.id != user_id:
  248. raise HTTPException(status_code=403, detail="权限不足")
  249. update_data = user_in.model_dump(exclude_unset=True)
  250. # Track actions for logging
  251. actions = []
  252. # Only Super Admin can change mobile
  253. if "mobile" in update_data:
  254. if current_user.role != "SUPER_ADMIN":
  255. del update_data["mobile"]
  256. else:
  257. # Require admin password for mobile change
  258. if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash):
  259. logger.warning(f"修改用户手机号失败: 管理员密码错误 (Target: {user.mobile})")
  260. raise HTTPException(status_code=403, detail="管理员密码错误")
  261. # Check uniqueness
  262. existing_user = db.query(User).filter(User.mobile == update_data["mobile"]).first()
  263. if existing_user and existing_user.id != user_id:
  264. raise HTTPException(status_code=400, detail="该手机号已存在")
  265. if user.mobile != update_data["mobile"]:
  266. old_mobile = user.mobile
  267. new_mobile = update_data["mobile"]
  268. # Update Mappings first (logic: user mobile change should propagate to mappings)
  269. # Find mappings where mapped_key matches old_mobile
  270. mappings = db.query(AppUserMapping).filter(
  271. AppUserMapping.user_id == user.id,
  272. AppUserMapping.mapped_key == old_mobile
  273. ).all()
  274. for mapping in mappings:
  275. mapping.mapped_key = new_mobile
  276. db.add(mapping)
  277. try:
  278. db.flush()
  279. except IntegrityError:
  280. db.rollback()
  281. logger.error(f"修改手机号失败: 映射冲突 ({old_mobile} -> {new_mobile})")
  282. raise HTTPException(status_code=400, detail="修改失败:新手机号在某些应用中已存在映射关联")
  283. actions.append((ActionType.UPDATE, {"field": "mobile", "old": old_mobile, "new": new_mobile}))
  284. # Only Super Admin can change status or role
  285. if "status" in update_data:
  286. if current_user.role != "SUPER_ADMIN":
  287. del update_data["status"]
  288. elif update_data["status"] == "DISABLED" and user_id == current_user.id:
  289. raise HTTPException(status_code=400, detail="超级管理员不能禁用自己")
  290. else:
  291. # Require admin password for status change
  292. if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash):
  293. logger.warning(f"修改用户状态失败: 密码错误")
  294. raise HTTPException(status_code=403, detail="管理员密码错误")
  295. # Add Log Action
  296. action_type = ActionType.DISABLE if update_data["status"] == "DISABLED" else ActionType.ENABLE
  297. actions.append((action_type, {"old": user.status, "new": update_data["status"]}))
  298. if "role" in update_data:
  299. if current_user.role != "SUPER_ADMIN":
  300. del update_data["role"]
  301. elif user_id == current_user.id and update_data["role"] != "SUPER_ADMIN":
  302. # Prevent demoting self
  303. raise HTTPException(status_code=400, detail="超级管理员不能降级自己")
  304. else:
  305. # Require admin password for role change
  306. if not user_in.admin_password or not security.verify_password(user_in.admin_password, current_user.password_hash):
  307. logger.warning(f"修改用户角色失败: 密码错误")
  308. raise HTTPException(status_code=403, detail="管理员密码错误")
  309. actions.append((ActionType.CHANGE_ROLE, {"old": user.role, "new": update_data["role"]}))
  310. if "admin_password" in update_data:
  311. del update_data["admin_password"]
  312. if "is_deleted" in update_data:
  313. if current_user.role != "SUPER_ADMIN":
  314. del update_data["is_deleted"]
  315. elif update_data["is_deleted"] == 1 and user_id == current_user.id:
  316. raise HTTPException(status_code=400, detail="超级管理员不能删除自己")
  317. elif update_data["is_deleted"] == 1:
  318. actions.append((ActionType.DELETE, {}))
  319. if "password" in update_data:
  320. password = update_data["password"]
  321. if password:
  322. password = password.strip()
  323. hashed_password = security.get_password_hash(password)
  324. del update_data["password"]
  325. user.password_hash = hashed_password
  326. # Auto-generate english_name if name changed and english_name not provided
  327. if "name" in update_data and update_data["name"]:
  328. update_data["name"] = update_data["name"].replace(" ", "")
  329. if "english_name" not in update_data or not update_data["english_name"]:
  330. update_data["english_name"] = generate_english_name(update_data["name"])
  331. for field, value in update_data.items():
  332. setattr(user, field, value)
  333. db.add(user)
  334. db.commit()
  335. db.refresh(user)
  336. # Trigger Webhook
  337. event_type = "UPDATE"
  338. if user.status == "DISABLED":
  339. event_type = "DISABLE"
  340. background_tasks.add_task(WebhookService.trigger_user_event, db, user.id, event_type)
  341. # Record Logs
  342. if current_user.role == "SUPER_ADMIN" and actions:
  343. for action_type, details in actions:
  344. LogService.create_log(
  345. db=db,
  346. operator_id=current_user.id,
  347. action_type=action_type,
  348. target_user_id=user.id,
  349. target_mobile=user.mobile,
  350. ip_address=get_client_ip(request),
  351. details=details
  352. )
  353. logger.info(f"更新用户信息成功: {user.mobile} (ID: {user.id})")
  354. return user
  355. @router.post("/{user_id}/promote", response_model=UserSchema, summary="提升用户权限")
  356. def promote_user(
  357. *,
  358. db: Session = Depends(deps.get_db),
  359. user_id: int,
  360. req: PromoteUserRequest,
  361. request: Request,
  362. current_user: User = Depends(deps.get_current_active_user),
  363. ):
  364. """
  365. 将开发者提升为超级管理员。
  366. 需要验证(管理员密码 + 验证码)。
  367. """
  368. if current_user.role != "SUPER_ADMIN":
  369. raise HTTPException(status_code=403, detail="权限不足")
  370. # 1. Verify Password
  371. if not security.verify_password(req.password, current_user.password_hash):
  372. logger.warning(f"提升权限失败: 密码错误 (Admin: {current_user.mobile})")
  373. raise HTTPException(status_code=403, detail="密码错误")
  374. # 2. Verify Captcha
  375. if not CaptchaService.verify_captcha(req.captcha_id, req.captcha_code):
  376. logger.warning(f"提升权限失败: 验证码错误")
  377. raise HTTPException(status_code=400, detail="验证码错误")
  378. user = db.query(User).filter(User.id == user_id).first()
  379. if not user:
  380. raise HTTPException(status_code=404, detail="用户不存在")
  381. old_role = user.role
  382. user.role = "SUPER_ADMIN"
  383. user.status = "ACTIVE" # Ensure they are active
  384. db.add(user)
  385. db.commit()
  386. db.refresh(user)
  387. # Log Operation
  388. LogService.create_log(
  389. db=db,
  390. operator_id=current_user.id,
  391. action_type=ActionType.CHANGE_ROLE,
  392. target_user_id=user.id,
  393. target_mobile=user.mobile,
  394. ip_address=get_client_ip(request),
  395. details={"old": old_role, "new": "SUPER_ADMIN"}
  396. )
  397. logger.info(f"提升用户为超管: {user.mobile}")
  398. return user
  399. @router.delete("/{user_id}", response_model=UserSchema, summary="删除用户")
  400. def delete_user(
  401. *,
  402. db: Session = Depends(deps.get_db),
  403. user_id: int,
  404. request: Request,
  405. current_user: User = Depends(deps.get_current_active_user),
  406. background_tasks: BackgroundTasks,
  407. ):
  408. """
  409. 软删除用户。仅限超级管理员。
  410. """
  411. if current_user.role != "SUPER_ADMIN":
  412. raise HTTPException(status_code=403, detail="权限不足")
  413. if user_id == current_user.id:
  414. raise HTTPException(status_code=400, detail="超级管理员不能删除自己")
  415. user = db.query(User).filter(User.id == user_id).first()
  416. if not user:
  417. raise HTTPException(status_code=404, detail="用户不存在")
  418. user.is_deleted = 1 # Using Integer 1 for True
  419. user.status = "DISABLED" # Also disable login
  420. db.add(user)
  421. db.commit()
  422. background_tasks.add_task(WebhookService.trigger_user_event, db, user.id, "DELETE")
  423. # Log Operation
  424. LogService.create_log(
  425. db=db,
  426. operator_id=current_user.id,
  427. action_type=ActionType.DELETE,
  428. target_user_id=user.id,
  429. target_mobile=user.mobile,
  430. ip_address=get_client_ip(request),
  431. details={"status": "DISABLED"}
  432. )
  433. logger.info(f"删除用户成功: {user.mobile}")
  434. return user
  435. @router.get("/me", response_model=UserSchema, summary="获取当前用户信息")
  436. def read_user_me(
  437. current_user: User = Depends(deps.get_current_active_user),
  438. ):
  439. """
  440. 获取当前登录用户的信息。
  441. """
  442. return current_user
  443. @router.get("/{user_id}", response_model=UserSchema, summary="根据ID获取用户")
  444. def read_user_by_id(
  445. user_id: int,
  446. current_user: User = Depends(deps.get_current_active_user),
  447. db: Session = Depends(deps.get_db),
  448. ):
  449. """
  450. 根据用户ID获取特定用户信息。
  451. """
  452. user = db.query(User).filter(User.id == user_id).first()
  453. if not user:
  454. raise HTTPException(
  455. status_code=404,
  456. detail="该用户ID在系统中不存在",
  457. )
  458. return user