identity_qr.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import logging
  2. import time
  3. from datetime import datetime, timezone
  4. from fastapi import APIRouter, Depends, HTTPException
  5. from sqlalchemy.orm import Session
  6. from app.api.v1 import deps
  7. from app.core.config import settings
  8. from app.core.identity_qr_crypto import (
  9. assert_payload_fresh,
  10. decrypt_identity_token,
  11. encrypt_identity_payload,
  12. )
  13. from app.models.user import User
  14. from app.schemas.identity_qr import (
  15. IdentityQrGenerateResponse,
  16. IdentityQrVerifyRequest,
  17. IdentityQrVerifyResponse,
  18. )
  19. router = APIRouter()
  20. logger = logging.getLogger(__name__)
  21. @router.get(
  22. "/",
  23. response_model=IdentityQrGenerateResponse,
  24. summary="获取个人身份二维码密文",
  25. )
  26. def get_identity_qr_payload(
  27. current_user: User = Depends(deps.get_current_active_user),
  28. ):
  29. """
  30. 返回 AES-256-GCM 加密载荷(含 id、姓名、手机号、exp),默认约 1 分钟内有效。
  31. 前端将 `token` 字符串生成二维码即可。
  32. """
  33. valid_seconds = settings.IDENTITY_QR_VALID_SECONDS
  34. exp_ts = time.time() + valid_seconds
  35. payload = {
  36. "id": current_user.id,
  37. "name": current_user.name or "",
  38. "mobile": current_user.mobile,
  39. "exp": exp_ts,
  40. }
  41. token = encrypt_identity_payload(payload)
  42. expires_at = datetime.fromtimestamp(exp_ts, tz=timezone.utc)
  43. return IdentityQrGenerateResponse(token=token, expires_at=expires_at)
  44. @router.post(
  45. "/verify",
  46. response_model=IdentityQrVerifyResponse,
  47. summary="解密并校验身份二维码",
  48. )
  49. def verify_identity_qr(
  50. body: IdentityQrVerifyRequest,
  51. db: Session = Depends(deps.get_db),
  52. _subject: deps.AuthSubject = Depends(deps.get_current_user_or_app_by_api_key),
  53. ):
  54. """
  55. 核验端(已登录用户或携带应用 API Key)提交扫码得到的 `token`,返回明文身份信息。
  56. """
  57. try:
  58. data = decrypt_identity_token(body.token)
  59. assert_payload_fresh(data)
  60. except ValueError as e:
  61. logger.info("identity_qr verify failed: %s", e)
  62. raise HTTPException(status_code=400, detail=str(e)) from e
  63. uid = data.get("id")
  64. if uid is None:
  65. raise HTTPException(status_code=400, detail="invalid payload: missing id")
  66. user = db.query(User).filter(User.id == int(uid)).first()
  67. if not user or user.is_deleted:
  68. raise HTTPException(status_code=404, detail="user not found")
  69. if user.status != "ACTIVE":
  70. raise HTTPException(status_code=400, detail="user inactive")
  71. mobile = data.get("mobile") or ""
  72. if user.mobile != mobile:
  73. raise HTTPException(status_code=400, detail="identity data mismatch")
  74. return IdentityQrVerifyResponse(
  75. id=user.id,
  76. name=user.name,
  77. mobile=user.mobile,
  78. )