| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- import logging
- import time
- from datetime import datetime, timezone
- from fastapi import APIRouter, Depends, HTTPException
- from sqlalchemy.orm import Session
- from app.api.v1 import deps
- from app.core.config import settings
- from app.core.identity_qr_crypto import (
- assert_payload_fresh,
- decrypt_identity_token,
- encrypt_identity_payload,
- )
- from app.models.user import User
- from app.schemas.identity_qr import (
- IdentityQrGenerateResponse,
- IdentityQrVerifyRequest,
- IdentityQrVerifyResponse,
- )
- router = APIRouter()
- logger = logging.getLogger(__name__)
- @router.get(
- "/",
- response_model=IdentityQrGenerateResponse,
- summary="获取个人身份二维码密文",
- )
- def get_identity_qr_payload(
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 返回 AES-256-GCM 加密载荷(含 id、姓名、手机号、exp),默认约 1 分钟内有效。
- 前端将 `token` 字符串生成二维码即可。
- """
- valid_seconds = settings.IDENTITY_QR_VALID_SECONDS
- exp_ts = time.time() + valid_seconds
- payload = {
- "id": current_user.id,
- "name": current_user.name or "",
- "mobile": current_user.mobile,
- "exp": exp_ts,
- }
- token = encrypt_identity_payload(payload)
- expires_at = datetime.fromtimestamp(exp_ts, tz=timezone.utc)
- return IdentityQrGenerateResponse(token=token, expires_at=expires_at)
- @router.post(
- "/verify",
- response_model=IdentityQrVerifyResponse,
- summary="解密并校验身份二维码",
- )
- def verify_identity_qr(
- body: IdentityQrVerifyRequest,
- db: Session = Depends(deps.get_db),
- _subject: deps.AuthSubject = Depends(deps.get_current_user_or_app_by_api_key),
- ):
- """
- 核验端(已登录用户或携带应用 API Key)提交扫码得到的 `token`,返回明文身份信息。
- """
- try:
- data = decrypt_identity_token(body.token)
- assert_payload_fresh(data)
- except ValueError as e:
- logger.info("identity_qr verify failed: %s", e)
- raise HTTPException(status_code=400, detail=str(e)) from e
- uid = data.get("id")
- if uid is None:
- raise HTTPException(status_code=400, detail="invalid payload: missing id")
- user = db.query(User).filter(User.id == int(uid)).first()
- if not user or user.is_deleted:
- raise HTTPException(status_code=404, detail="user not found")
- if user.status != "ACTIVE":
- raise HTTPException(status_code=400, detail="user inactive")
- mobile = data.get("mobile") or ""
- if user.mobile != mobile:
- raise HTTPException(status_code=400, detail="identity data mismatch")
- return IdentityQrVerifyResponse(
- id=user.id,
- name=user.name,
- mobile=user.mobile,
- )
|