| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- from sqlalchemy.orm import Session
- from app.dao.user_dao import UserDAO
- from app.model.user_model import User
- from app.utils.logger import Logger, ErrorHandler
- class UserService:
- def __init__(self, db_session: Session):
- self.user_dao = UserDAO(db_session)
- def get_all_users(self):
- """获取所有用户"""
- return self.user_dao.get_all()
- def get_user_by_id(self, user_id: int):
- """根据ID获取用户"""
- user = self.user_dao.get_by_id(user_id)
- if not user:
- raise ValueError(f"User with id {user_id} not found.")
- return user
- def get_user_by_username(self, username: str):
- """根据用户名获取用户"""
- user = self.user_dao.get_by_username(username)
- if not user:
- raise ValueError(f"User with username {username} not found.")
- return user
- def create_user(self, username: str, email: str, password: str):
- """创建新用户"""
- Logger.info("开始创建用户", {
- 'username': username,
- 'email': email
- })
-
- try:
- # 验证输入
- if not username or not email or not password:
- Logger.warning("用户创建失败:缺少必要参数", {
- 'username': username,
- 'email': email,
- 'has_password': bool(password)
- })
- raise ValueError("Username, email and password are required.")
-
- if len(password) < 6:
- Logger.warning("用户创建失败:密码长度不足", {
- 'username': username,
- 'password_length': len(password)
- })
- raise ValueError("Password must be at least 6 characters long.")
-
- # 检查用户名是否已存在
- if self.user_dao.exists_by_username(username):
- Logger.warning("用户创建失败:用户名已存在", {
- 'username': username
- })
- raise ValueError("Username already exists.")
-
- # 检查邮箱是否已存在
- if self.user_dao.exists_by_email(email):
- Logger.warning("用户创建失败:邮箱已存在", {
- 'email': email
- })
- raise ValueError("Email already exists.")
-
- # 创建新用户
- new_user = User(username=username, email=email)
- new_user.set_password(password)
-
- created_user = self.user_dao.create(new_user)
-
- Logger.info("用户创建成功", {
- 'user_id': created_user.id,
- 'username': created_user.username,
- 'email': created_user.email
- })
-
- return created_user
-
- except Exception as e:
- Logger.error("用户创建过程中发生异常", e, {
- 'username': username,
- 'email': email
- })
- raise
- def authenticate_user(self, username: str, password: str):
- """验证用户登录"""
- Logger.info("开始用户认证", {
- 'username': username
- })
-
- try:
- if not username or not password:
- Logger.warning("用户认证失败:缺少用户名或密码", {
- 'username': username,
- 'has_password': bool(password)
- })
- raise ValueError("Username and password are required.")
-
- user = self.user_dao.get_by_username(username)
- if not user:
- Logger.warning("用户认证失败:用户不存在", {
- 'username': username
- })
- raise ValueError("Invalid username or password.")
-
- if not user.is_active:
- Logger.warning("用户认证失败:账户已禁用", {
- 'user_id': user.id,
- 'username': username
- })
- raise ValueError("User account is disabled.")
-
- if not user.check_password(password):
- Logger.warning("用户认证失败:密码错误", {
- 'user_id': user.id,
- 'username': username
- })
- raise ValueError("Invalid username or password.")
-
- Logger.info("用户认证成功", {
- 'user_id': user.id,
- 'username': user.username
- })
-
- return user
-
- except Exception as e:
- Logger.error("用户认证过程中发生异常", e, {
- 'username': username
- })
- raise
- def update_user(self, user_id: int, username: str = None, email: str = None, is_active: bool = None):
- """更新用户信息"""
- user = self.get_user_by_id(user_id)
-
- if username is not None and username != user.username:
- if self.user_dao.exists_by_username(username):
- raise ValueError("Username already exists.")
- user.username = username
-
- if email is not None and email != user.email:
- if self.user_dao.exists_by_email(email):
- raise ValueError("Email already exists.")
- user.email = email
-
- if is_active is not None:
- user.is_active = is_active
-
- return self.user_dao.update(user)
- def change_password(self, user_id: int, old_password: str, new_password: str):
- """修改用户密码"""
- Logger.info("开始修改用户密码", {
- 'user_id': user_id
- })
-
- try:
- user = self.get_user_by_id(user_id)
-
- if not user.check_password(old_password):
- Logger.warning("密码修改失败:当前密码错误", {
- 'user_id': user_id,
- 'username': user.username
- })
- raise ValueError("Current password is incorrect.")
-
- if len(new_password) < 6:
- Logger.warning("密码修改失败:新密码长度不足", {
- 'user_id': user_id,
- 'username': user.username,
- 'new_password_length': len(new_password)
- })
- raise ValueError("New password must be at least 6 characters long.")
-
- user.set_password(new_password)
- updated_user = self.user_dao.update(user)
-
- Logger.info("用户密码修改成功", {
- 'user_id': user_id,
- 'username': user.username
- })
-
- return updated_user
-
- except Exception as e:
- Logger.error("密码修改过程中发生异常", e, {
- 'user_id': user_id
- })
- raise
- def delete_user(self, user_id: int):
- """删除用户"""
- user = self.get_user_by_id(user_id)
- self.user_dao.delete(user)
- return True
|