| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- from flask import request, g
- from flask_restx import Resource, Namespace, fields
- from app.service.user_service import UserService
- from app.utils.jwt_utils import JWTUtils, token_required
- from app.utils.logger import Logger, log_request_info, ErrorHandler
- from app import db
- # 创建认证API命名空间
- api = Namespace('auth', description='用户认证相关操作')
- # 定义API模型
- login_model = api.model('Login', {
- 'username': fields.String(required=True, description='用户名'),
- 'password': fields.String(required=True, description='密码')
- })
- register_model = api.model('Register', {
- 'username': fields.String(required=True, description='用户名'),
- 'email': fields.String(required=True, description='邮箱'),
- 'password': fields.String(required=True, description='密码')
- })
- change_password_model = api.model('ChangePassword', {
- 'old_password': fields.String(required=True, description='当前密码'),
- 'new_password': fields.String(required=True, description='新密码')
- })
- user_model = api.model('User', {
- 'id': fields.Integer(readOnly=True, description='用户ID'),
- 'username': fields.String(description='用户名'),
- 'email': fields.String(description='邮箱'),
- 'is_active': fields.Boolean(description='是否激活'),
- 'created_at': fields.String(description='创建时间'),
- 'updated_at': fields.String(description='更新时间')
- })
- token_response_model = api.model('TokenResponse', {
- 'access_token': fields.String(description='访问令牌'),
- 'token_type': fields.String(description='令牌类型'),
- 'expires_in': fields.Integer(description='过期时间(秒)'),
- 'user': fields.Nested(user_model, description='用户信息')
- })
- @api.route('/register')
- class RegisterResource(Resource):
- @api.expect(register_model, validate=True)
- @api.marshal_with(user_model, code=201)
- @api.response(400, '注册失败')
- @log_request_info
- def post(self):
- """用户注册"""
- data = request.get_json()
- service = UserService(db.session)
-
- # 记录注册尝试
- Logger.info("用户注册尝试", {
- 'username': data.get('username'),
- 'email': data.get('email'),
- 'ip_address': request.remote_addr,
- 'user_agent': request.headers.get('User-Agent')
- })
-
- try:
- new_user = service.create_user(
- username=data['username'],
- email=data['email'],
- password=data['password']
- )
-
- # 记录注册成功
- Logger.info("用户注册成功", {
- 'user_id': new_user.id,
- 'username': new_user.username,
- 'email': new_user.email,
- 'ip_address': request.remote_addr
- })
-
- return new_user.to_dict(), 201
- except ValueError as e:
- # 记录注册失败
- Logger.warning("用户注册失败", {
- 'username': data.get('username'),
- 'email': data.get('email'),
- 'error': str(e),
- 'ip_address': request.remote_addr
- })
- api.abort(400, str(e))
- @api.route('/login')
- class LoginResource(Resource):
- @api.expect(login_model, validate=True)
- @api.marshal_with(token_response_model)
- @api.response(401, '登录失败')
- @log_request_info
- def post(self):
- """用户登录"""
- data = request.get_json()
- service = UserService(db.session)
-
- # 记录登录尝试
- Logger.info("用户登录尝试", {
- 'username': data.get('username'),
- 'ip_address': request.remote_addr,
- 'user_agent': request.headers.get('User-Agent')
- })
-
- try:
- user = service.authenticate_user(
- username=data['username'],
- password=data['password']
- )
-
- # 生成永不过期的JWT token(expires_in=0)
- token = JWTUtils.generate_token(user.id, user.username, expires_in=0)
-
- # 记录登录成功
- Logger.info("用户登录成功", {
- 'user_id': user.id,
- 'username': user.username,
- 'ip_address': request.remote_addr
- })
-
- return {
- 'access_token': token,
- 'token_type': 'Bearer',
- 'expires_in': 0,
- 'user': user.to_dict_public()
- }
- except ValueError as e:
- # 记录登录失败
- Logger.warning("用户登录失败", {
- 'username': data.get('username'),
- 'error': str(e),
- 'ip_address': request.remote_addr
- })
- api.abort(401, str(e))
- @api.route('/me')
- class UserProfileResource(Resource):
- @token_required
- @api.marshal_with(user_model)
- @log_request_info
- def get(self):
- """获取当前用户信息"""
- Logger.info("获取用户信息", {
- 'user_id': g.current_user.id,
- 'username': g.current_user.username
- })
- return g.current_user.to_dict()
- @token_required
- @api.expect(api.model('UpdateProfile', {
- 'username': fields.String(description='用户名'),
- 'email': fields.String(description='邮箱')
- }))
- @api.marshal_with(user_model)
- @api.response(400, '更新失败')
- @log_request_info
- def put(self):
- """更新当前用户信息"""
- data = request.get_json()
- service = UserService(db.session)
-
- Logger.info("用户信息更新尝试", {
- 'user_id': g.current_user.id,
- 'username': g.current_user.username,
- 'new_username': data.get('username'),
- 'new_email': data.get('email')
- })
-
- try:
- updated_user = service.update_user(
- user_id=g.current_user.id,
- username=data.get('username'),
- email=data.get('email')
- )
-
- Logger.info("用户信息更新成功", {
- 'user_id': updated_user.id,
- 'username': updated_user.username,
- 'email': updated_user.email
- })
-
- return updated_user.to_dict()
- except ValueError as e:
- Logger.warning("用户信息更新失败", {
- 'user_id': g.current_user.id,
- 'error': str(e)
- })
- api.abort(400, str(e))
- @api.route('/change-password')
- class ChangePasswordResource(Resource):
- @token_required
- @api.expect(change_password_model, validate=True)
- @api.response(200, '密码修改成功')
- @api.response(400, '密码修改失败')
- @log_request_info
- def post(self):
- """修改密码"""
- data = request.get_json()
- service = UserService(db.session)
-
- Logger.info("用户密码修改尝试", {
- 'user_id': g.current_user.id,
- 'username': g.current_user.username
- })
-
- try:
- service.change_password(
- user_id=g.current_user.id,
- old_password=data['old_password'],
- new_password=data['new_password']
- )
-
- Logger.info("用户密码修改成功", {
- 'user_id': g.current_user.id,
- 'username': g.current_user.username
- })
-
- return {'message': '密码修改成功'}, 200
- except ValueError as e:
- Logger.warning("用户密码修改失败", {
- 'user_id': g.current_user.id,
- 'username': g.current_user.username,
- 'error': str(e)
- })
- api.abort(400, str(e))
- @api.route('/logout')
- class LogoutResource(Resource):
- @token_required
- @api.response(200, '登出成功')
- @log_request_info
- def post(self):
- """用户登出(客户端需要删除token)"""
- Logger.info("用户登出", {
- 'user_id': g.current_user.id,
- 'username': g.current_user.username
- })
- return {'message': '登出成功'}, 200
- @api.route('/verify-token')
- class VerifyTokenResource(Resource):
- @token_required
- @api.marshal_with(user_model)
- @log_request_info
- def get(self):
- """验证token有效性"""
- Logger.info("Token验证", {
- 'user_id': g.current_user.id,
- 'username': g.current_user.username
- })
- return g.current_user.to_dict()
- @api.route('/permanent-token')
- class PermanentTokenResource(Resource):
- @token_required
- @api.marshal_with(token_response_model)
- @log_request_info
- def post(self):
- """使用当前有效token换取长期有效token(无过期)"""
- user = g.current_user
-
- Logger.info("生成长期Token", {
- 'user_id': user.id,
- 'username': user.username
- })
-
- token = JWTUtils.generate_token(user.id, user.username, expires_in=0)
- return {
- 'access_token': token,
- 'token_type': 'Bearer',
- 'expires_in': 0,
- 'user': user.to_dict_public()
- }
|