from sqlalchemy.orm import Session from app.dao.user_dao import UserDAO from app.model.user_model import User from app.utils.logger import Logger, ErrorHandler class UserService: def __init__(self, db_session: Session): self.user_dao = UserDAO(db_session) def get_all_users(self): """获取所有用户""" return self.user_dao.get_all() def get_user_by_id(self, user_id: int): """根据ID获取用户""" user = self.user_dao.get_by_id(user_id) if not user: raise ValueError(f"User with id {user_id} not found.") return user def get_user_by_username(self, username: str): """根据用户名获取用户""" user = self.user_dao.get_by_username(username) if not user: raise ValueError(f"User with username {username} not found.") return user def create_user(self, username: str, email: str, password: str): """创建新用户""" Logger.info("开始创建用户", { 'username': username, 'email': email }) try: # 验证输入 if not username or not email or not password: Logger.warning("用户创建失败:缺少必要参数", { 'username': username, 'email': email, 'has_password': bool(password) }) raise ValueError("Username, email and password are required.") if len(password) < 6: Logger.warning("用户创建失败:密码长度不足", { 'username': username, 'password_length': len(password) }) raise ValueError("Password must be at least 6 characters long.") # 检查用户名是否已存在 if self.user_dao.exists_by_username(username): Logger.warning("用户创建失败:用户名已存在", { 'username': username }) raise ValueError("Username already exists.") # 检查邮箱是否已存在 if self.user_dao.exists_by_email(email): Logger.warning("用户创建失败:邮箱已存在", { 'email': email }) raise ValueError("Email already exists.") # 创建新用户 new_user = User(username=username, email=email) new_user.set_password(password) created_user = self.user_dao.create(new_user) Logger.info("用户创建成功", { 'user_id': created_user.id, 'username': created_user.username, 'email': created_user.email }) return created_user except Exception as e: Logger.error("用户创建过程中发生异常", e, { 'username': username, 'email': email }) raise def authenticate_user(self, username: str, password: str): """验证用户登录""" Logger.info("开始用户认证", { 'username': username }) try: if not username or not password: Logger.warning("用户认证失败:缺少用户名或密码", { 'username': username, 'has_password': bool(password) }) raise ValueError("Username and password are required.") user = self.user_dao.get_by_username(username) if not user: Logger.warning("用户认证失败:用户不存在", { 'username': username }) raise ValueError("Invalid username or password.") if not user.is_active: Logger.warning("用户认证失败:账户已禁用", { 'user_id': user.id, 'username': username }) raise ValueError("User account is disabled.") if not user.check_password(password): Logger.warning("用户认证失败:密码错误", { 'user_id': user.id, 'username': username }) raise ValueError("Invalid username or password.") Logger.info("用户认证成功", { 'user_id': user.id, 'username': user.username }) return user except Exception as e: Logger.error("用户认证过程中发生异常", e, { 'username': username }) raise def update_user(self, user_id: int, username: str = None, email: str = None, is_active: bool = None): """更新用户信息""" user = self.get_user_by_id(user_id) if username is not None and username != user.username: if self.user_dao.exists_by_username(username): raise ValueError("Username already exists.") user.username = username if email is not None and email != user.email: if self.user_dao.exists_by_email(email): raise ValueError("Email already exists.") user.email = email if is_active is not None: user.is_active = is_active return self.user_dao.update(user) def change_password(self, user_id: int, old_password: str, new_password: str): """修改用户密码""" Logger.info("开始修改用户密码", { 'user_id': user_id }) try: user = self.get_user_by_id(user_id) if not user.check_password(old_password): Logger.warning("密码修改失败:当前密码错误", { 'user_id': user_id, 'username': user.username }) raise ValueError("Current password is incorrect.") if len(new_password) < 6: Logger.warning("密码修改失败:新密码长度不足", { 'user_id': user_id, 'username': user.username, 'new_password_length': len(new_password) }) raise ValueError("New password must be at least 6 characters long.") user.set_password(new_password) updated_user = self.user_dao.update(user) Logger.info("用户密码修改成功", { 'user_id': user_id, 'username': user.username }) return updated_user except Exception as e: Logger.error("密码修改过程中发生异常", e, { 'user_id': user_id }) raise def delete_user(self, user_id: int): """删除用户""" user = self.get_user_by_id(user_id) self.user_dao.delete(user) return True