import jwt import datetime from functools import wraps from flask import request, current_app from app.model.user_model import User from app import db class JWTUtils: @staticmethod def generate_token(user_id: int, username: str, expires_in: int = 3600): """ 生成JWT token Args: user_id: 用户ID username: 用户名 expires_in: token过期时间(秒),默认1小时 Returns: str: JWT token """ payload = { 'user_id': user_id, 'username': username, 'iat': datetime.datetime.utcnow() } # 仅当 expires_in > 0 时设置过期时间;<=0 表示永不过期 if isinstance(expires_in, int) and expires_in > 0: payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in) token = jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256') return token @staticmethod def verify_token(token: str): """ 验证JWT token Args: token: JWT token字符串 Returns: dict: token payload,如果验证失败则返回None """ try: payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256']) return payload except jwt.ExpiredSignatureError: return None except jwt.InvalidTokenError: return None @staticmethod def get_current_user(): """ 从请求头中获取当前用户 Returns: User: 当前用户对象,如果未认证则返回None """ auth_header = request.headers.get('Authorization') if not auth_header: return None try: # 期望格式: "Bearer " token = auth_header.split(' ')[1] except IndexError: return None payload = JWTUtils.verify_token(token) if not payload: return None user_id = payload.get('user_id') if not user_id: return None return db.session.query(User).get(user_id) def token_required(f): """ JWT认证装饰器 使用方法: @token_required def protected_route(): # 当前用户可以通过 g.current_user 访问 pass """ @wraps(f) def decorated(*args, **kwargs): from flask import g auth_header = request.headers.get('Authorization') if not auth_header: return {'message': '缺少认证token'}, 401 try: # 期望格式: "Bearer " token = auth_header.split(' ')[1] except IndexError: return {'message': '认证token格式错误'}, 401 payload = JWTUtils.verify_token(token) if not payload: return {'message': '认证token无效或已过期'}, 401 user_id = payload.get('user_id') if not user_id: return {'message': '认证token无效'}, 401 # 获取用户信息 user = db.session.query(User).get(user_id) if not user or not user.is_active: return {'message': '用户不存在或已被禁用'}, 401 # 将用户信息存储到g对象中,供视图函数使用 g.current_user = user return f(*args, **kwargs) return decorated def admin_required(f): """ 管理员权限装饰器(可以扩展用户角色功能) 使用方法: @admin_required def admin_route(): pass """ @wraps(f) def decorated(*args, **kwargs): from flask import g if not hasattr(g, 'current_user') or not g.current_user: return jsonify({'message': '需要管理员权限'}), 403 # 这里可以添加角色检查逻辑 # 目前简单检查用户名是否为admin if g.current_user.username != 'admin': return jsonify({'message': '需要管理员权限'}), 403 return f(*args, **kwargs) return decorated