import logging import time from datetime import datetime, timezone from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from app.api.v1 import deps from app.core.config import settings from app.core.identity_qr_crypto import ( assert_payload_fresh, decrypt_identity_token, encrypt_identity_payload, ) from app.models.user import User from app.schemas.identity_qr import ( IdentityQrGenerateResponse, IdentityQrVerifyRequest, IdentityQrVerifyResponse, ) router = APIRouter() logger = logging.getLogger(__name__) @router.get( "/", response_model=IdentityQrGenerateResponse, summary="获取个人身份二维码密文", ) def get_identity_qr_payload( current_user: User = Depends(deps.get_current_active_user), ): """ 返回 AES-256-GCM 加密载荷(含 id、姓名、手机号、exp),默认约 1 分钟内有效。 前端将 `token` 字符串生成二维码即可。 """ valid_seconds = settings.IDENTITY_QR_VALID_SECONDS exp_ts = time.time() + valid_seconds payload = { "id": current_user.id, "name": current_user.name or "", "mobile": current_user.mobile, "exp": exp_ts, } token = encrypt_identity_payload(payload) expires_at = datetime.fromtimestamp(exp_ts, tz=timezone.utc) return IdentityQrGenerateResponse(token=token, expires_at=expires_at) @router.post( "/verify", response_model=IdentityQrVerifyResponse, summary="解密并校验身份二维码", ) def verify_identity_qr( body: IdentityQrVerifyRequest, db: Session = Depends(deps.get_db), _subject: deps.AuthSubject = Depends(deps.get_current_user_or_app_by_api_key), ): """ 核验端(已登录用户或携带应用 API Key)提交扫码得到的 `token`,返回明文身份信息。 """ try: data = decrypt_identity_token(body.token) assert_payload_fresh(data) except ValueError as e: logger.info("identity_qr verify failed: %s", e) raise HTTPException(status_code=400, detail=str(e)) from e uid = data.get("id") if uid is None: raise HTTPException(status_code=400, detail="invalid payload: missing id") user = db.query(User).filter(User.id == int(uid)).first() if not user or user.is_deleted: raise HTTPException(status_code=404, detail="user not found") if user.status != "ACTIVE": raise HTTPException(status_code=400, detail="user inactive") mobile = data.get("mobile") or "" if user.mobile != mobile: raise HTTPException(status_code=400, detail="identity data mismatch") return IdentityQrVerifyResponse( id=user.id, name=user.name, mobile=user.mobile, )