| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- import jwt
- import datetime
- from functools import wraps
- from flask import request, current_app
- from app.model.user_model import User
- from app import db
- class JWTUtils:
- @staticmethod
- def generate_token(user_id: int, username: str, expires_in: int = 3600):
- """
- 生成JWT token
-
- Args:
- user_id: 用户ID
- username: 用户名
- expires_in: token过期时间(秒),默认1小时
-
- Returns:
- str: JWT token
- """
- payload = {
- 'user_id': user_id,
- 'username': username,
- 'iat': datetime.datetime.utcnow()
- }
- # 仅当 expires_in > 0 时设置过期时间;<=0 表示永不过期
- if isinstance(expires_in, int) and expires_in > 0:
- payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in)
-
- token = jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256')
- return token
- @staticmethod
- def verify_token(token: str):
- """
- 验证JWT token
-
- Args:
- token: JWT token字符串
-
- Returns:
- dict: token payload,如果验证失败则返回None
- """
- try:
- payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
- return payload
- except jwt.ExpiredSignatureError:
- return None
- except jwt.InvalidTokenError:
- return None
- @staticmethod
- def get_current_user():
- """
- 从请求头中获取当前用户
-
- Returns:
- User: 当前用户对象,如果未认证则返回None
- """
- auth_header = request.headers.get('Authorization')
- if not auth_header:
- return None
-
- try:
- # 期望格式: "Bearer <token>"
- token = auth_header.split(' ')[1]
- except IndexError:
- return None
-
- payload = JWTUtils.verify_token(token)
- if not payload:
- return None
-
- user_id = payload.get('user_id')
- if not user_id:
- return None
-
- return db.session.query(User).get(user_id)
- def token_required(f):
- """
- JWT认证装饰器
-
- 使用方法:
- @token_required
- def protected_route():
- # 当前用户可以通过 g.current_user 访问
- pass
- """
- @wraps(f)
- def decorated(*args, **kwargs):
- from flask import g
-
- auth_header = request.headers.get('Authorization')
- if not auth_header:
- return {'message': '缺少认证token'}, 401
-
- try:
- # 期望格式: "Bearer <token>"
- token = auth_header.split(' ')[1]
- except IndexError:
- return {'message': '认证token格式错误'}, 401
-
- payload = JWTUtils.verify_token(token)
- if not payload:
- return {'message': '认证token无效或已过期'}, 401
-
- user_id = payload.get('user_id')
- if not user_id:
- return {'message': '认证token无效'}, 401
-
- # 获取用户信息
- user = db.session.query(User).get(user_id)
- if not user or not user.is_active:
- return {'message': '用户不存在或已被禁用'}, 401
-
- # 将用户信息存储到g对象中,供视图函数使用
- g.current_user = user
- return f(*args, **kwargs)
-
- return decorated
- def admin_required(f):
- """
- 管理员权限装饰器(可以扩展用户角色功能)
-
- 使用方法:
- @admin_required
- def admin_route():
- pass
- """
- @wraps(f)
- def decorated(*args, **kwargs):
- from flask import g
-
- if not hasattr(g, 'current_user') or not g.current_user:
- return jsonify({'message': '需要管理员权限'}), 403
-
- # 这里可以添加角色检查逻辑
- # 目前简单检查用户名是否为admin
- if g.current_user.username != 'admin':
- return jsonify({'message': '需要管理员权限'}), 403
-
- return f(*args, **kwargs)
-
- return decorated
|