| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- from typing import Any
- from datetime import timedelta
- from fastapi import APIRouter, Depends, HTTPException
- from fastapi.security import OAuth2PasswordRequestForm
- from sqlalchemy.orm import Session
- from app.api.v1 import deps
- from app.core import security
- from app.core.config import settings
- from app.models.user import User
- from app.models.application import Application
- from app.schemas.token import Token, LoginRequest, AppLoginRequest
- router = APIRouter()
- @router.post("/login", response_model=Token, summary="用户登录 (OAuth2)")
- def login(
- db: Session = Depends(deps.get_db),
- # We support both OAuth2 form (username field usually) and JSON body
- form_data: OAuth2PasswordRequestForm = Depends(),
- ) -> Any:
- """
- OAuth2 兼容的令牌登录,获取访问令牌以进行后续请求
- """
- # Note: OAuth2PasswordRequestForm uses 'username', we map it to 'mobile'
- user = db.query(User).filter(User.mobile == form_data.username).first()
-
- if not user or not security.verify_password(form_data.password, user.password_hash):
- raise HTTPException(status_code=400, detail="手机号或密码错误")
-
- if user.status == "PENDING":
- raise HTTPException(status_code=403, detail="账户待审核")
-
- if user.status == "DISABLED":
- raise HTTPException(status_code=400, detail="账户已禁用")
-
- access_token = security.create_access_token(subject=user.id)
- return {
- "access_token": access_token,
- "token_type": "bearer",
- }
- @router.post("/login/json", response_model=Token, summary="用户登录 (JSON)")
- def login_json(
- login_data: LoginRequest,
- db: Session = Depends(deps.get_db),
- ) -> Any:
- """
- 使用 JSON 载荷进行登录
- """
- user = db.query(User).filter(User.mobile == login_data.mobile).first()
-
- if not user or not security.verify_password(login_data.password, user.password_hash):
- raise HTTPException(status_code=400, detail="手机号或密码错误")
-
- if user.status == "PENDING":
- raise HTTPException(status_code=403, detail="账户待审核")
-
- if user.status == "DISABLED":
- raise HTTPException(status_code=400, detail="账户已禁用")
-
- access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
- if login_data.remember_me:
- access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES_LONG)
-
- access_token = security.create_access_token(
- subject=user.id,
- expires_delta=access_token_expires,
- is_long_term=login_data.remember_me
- )
- return {
- "access_token": access_token,
- "token_type": "bearer",
- }
- @router.post("/app-token", response_model=Token, summary="应用登录")
- def app_login(
- req: AppLoginRequest,
- db: Session = Depends(deps.get_db),
- ):
- """
- 获取应用的访问令牌(客户端凭证模式)
- """
- app = db.query(Application).filter(Application.app_id == req.app_id).first()
- if not app:
- raise HTTPException(status_code=400, detail="无效的 client_id 或 client_secret")
-
- if app.app_secret != req.app_secret:
- raise HTTPException(status_code=400, detail="无效的 client_id 或 client_secret")
-
- access_token = security.create_access_token(subject=f"app:{app.id}")
- return {
- "access_token": access_token,
- "token_type": "bearer",
- }
|