auth.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. from typing import Any
  2. from datetime import timedelta
  3. from fastapi import APIRouter, Depends, HTTPException
  4. from fastapi.security import OAuth2PasswordRequestForm
  5. from sqlalchemy.orm import Session
  6. from app.api.v1 import deps
  7. from app.core import security
  8. from app.core.config import settings
  9. from app.models.user import User
  10. from app.models.application import Application
  11. from app.schemas.token import Token, LoginRequest, AppLoginRequest
  12. router = APIRouter()
  13. @router.post("/login", response_model=Token, summary="用户登录 (OAuth2)")
  14. def login(
  15. db: Session = Depends(deps.get_db),
  16. # We support both OAuth2 form (username field usually) and JSON body
  17. form_data: OAuth2PasswordRequestForm = Depends(),
  18. ) -> Any:
  19. """
  20. OAuth2 兼容的令牌登录,获取访问令牌以进行后续请求
  21. """
  22. # Note: OAuth2PasswordRequestForm uses 'username', we map it to 'mobile'
  23. user = db.query(User).filter(User.mobile == form_data.username).first()
  24. if not user or not security.verify_password(form_data.password, user.password_hash):
  25. raise HTTPException(status_code=400, detail="手机号或密码错误")
  26. if user.status == "PENDING":
  27. raise HTTPException(status_code=403, detail="账户待审核")
  28. if user.status == "DISABLED":
  29. raise HTTPException(status_code=400, detail="账户已禁用")
  30. access_token = security.create_access_token(subject=user.id)
  31. return {
  32. "access_token": access_token,
  33. "token_type": "bearer",
  34. }
  35. @router.post("/login/json", response_model=Token, summary="用户登录 (JSON)")
  36. def login_json(
  37. login_data: LoginRequest,
  38. db: Session = Depends(deps.get_db),
  39. ) -> Any:
  40. """
  41. 使用 JSON 载荷进行登录
  42. """
  43. user = db.query(User).filter(User.mobile == login_data.mobile).first()
  44. if not user or not security.verify_password(login_data.password, user.password_hash):
  45. raise HTTPException(status_code=400, detail="手机号或密码错误")
  46. if user.status == "PENDING":
  47. raise HTTPException(status_code=403, detail="账户待审核")
  48. if user.status == "DISABLED":
  49. raise HTTPException(status_code=400, detail="账户已禁用")
  50. access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
  51. if login_data.remember_me:
  52. access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES_LONG)
  53. access_token = security.create_access_token(
  54. subject=user.id,
  55. expires_delta=access_token_expires,
  56. is_long_term=login_data.remember_me
  57. )
  58. return {
  59. "access_token": access_token,
  60. "token_type": "bearer",
  61. }
  62. @router.post("/app-token", response_model=Token, summary="应用登录")
  63. def app_login(
  64. req: AppLoginRequest,
  65. db: Session = Depends(deps.get_db),
  66. ):
  67. """
  68. 获取应用的访问令牌(客户端凭证模式)
  69. """
  70. app = db.query(Application).filter(Application.app_id == req.app_id).first()
  71. if not app:
  72. raise HTTPException(status_code=400, detail="无效的 client_id 或 client_secret")
  73. if app.app_secret != req.app_secret:
  74. raise HTTPException(status_code=400, detail="无效的 client_id 或 client_secret")
  75. access_token = security.create_access_token(subject=f"app:{app.id}")
  76. return {
  77. "access_token": access_token,
  78. "token_type": "bearer",
  79. }