jwt_utils.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import jwt
  2. import datetime
  3. from functools import wraps
  4. from flask import request, current_app
  5. from app.model.user_model import User
  6. from app import db
  7. class JWTUtils:
  8. @staticmethod
  9. def generate_token(user_id: int, username: str, expires_in: int = 3600):
  10. """
  11. 生成JWT token
  12. Args:
  13. user_id: 用户ID
  14. username: 用户名
  15. expires_in: token过期时间(秒),默认1小时
  16. Returns:
  17. str: JWT token
  18. """
  19. payload = {
  20. 'user_id': user_id,
  21. 'username': username,
  22. 'iat': datetime.datetime.utcnow()
  23. }
  24. # 仅当 expires_in > 0 时设置过期时间;<=0 表示永不过期
  25. if isinstance(expires_in, int) and expires_in > 0:
  26. payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in)
  27. token = jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256')
  28. return token
  29. @staticmethod
  30. def verify_token(token: str):
  31. """
  32. 验证JWT token
  33. Args:
  34. token: JWT token字符串
  35. Returns:
  36. dict: token payload,如果验证失败则返回None
  37. """
  38. try:
  39. payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
  40. return payload
  41. except jwt.ExpiredSignatureError:
  42. return None
  43. except jwt.InvalidTokenError:
  44. return None
  45. @staticmethod
  46. def get_current_user():
  47. """
  48. 从请求头中获取当前用户
  49. Returns:
  50. User: 当前用户对象,如果未认证则返回None
  51. """
  52. auth_header = request.headers.get('Authorization')
  53. if not auth_header:
  54. return None
  55. try:
  56. # 期望格式: "Bearer <token>"
  57. token = auth_header.split(' ')[1]
  58. except IndexError:
  59. return None
  60. payload = JWTUtils.verify_token(token)
  61. if not payload:
  62. return None
  63. user_id = payload.get('user_id')
  64. if not user_id:
  65. return None
  66. return db.session.query(User).get(user_id)
  67. def token_required(f):
  68. """
  69. JWT认证装饰器
  70. 使用方法:
  71. @token_required
  72. def protected_route():
  73. # 当前用户可以通过 g.current_user 访问
  74. pass
  75. """
  76. @wraps(f)
  77. def decorated(*args, **kwargs):
  78. from flask import g
  79. auth_header = request.headers.get('Authorization')
  80. if not auth_header:
  81. return {'message': '缺少认证token'}, 401
  82. try:
  83. # 期望格式: "Bearer <token>"
  84. token = auth_header.split(' ')[1]
  85. except IndexError:
  86. return {'message': '认证token格式错误'}, 401
  87. payload = JWTUtils.verify_token(token)
  88. if not payload:
  89. return {'message': '认证token无效或已过期'}, 401
  90. user_id = payload.get('user_id')
  91. if not user_id:
  92. return {'message': '认证token无效'}, 401
  93. # 获取用户信息
  94. user = db.session.query(User).get(user_id)
  95. if not user or not user.is_active:
  96. return {'message': '用户不存在或已被禁用'}, 401
  97. # 将用户信息存储到g对象中,供视图函数使用
  98. g.current_user = user
  99. return f(*args, **kwargs)
  100. return decorated
  101. def admin_required(f):
  102. """
  103. 管理员权限装饰器(可以扩展用户角色功能)
  104. 使用方法:
  105. @admin_required
  106. def admin_route():
  107. pass
  108. """
  109. @wraps(f)
  110. def decorated(*args, **kwargs):
  111. from flask import g
  112. if not hasattr(g, 'current_user') or not g.current_user:
  113. return jsonify({'message': '需要管理员权限'}), 403
  114. # 这里可以添加角色检查逻辑
  115. # 目前简单检查用户名是否为admin
  116. if g.current_user.username != 'admin':
  117. return jsonify({'message': '需要管理员权限'}), 403
  118. return f(*args, **kwargs)
  119. return decorated