login_log_service.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. from sqlalchemy.orm import Session
  2. from sqlalchemy import desc
  3. from typing import Optional
  4. from datetime import datetime
  5. from app.models.login_log import LoginLog, LoginMethod, AuthType
  6. from app.schemas.login_log import LoginLogCreate
  7. class LoginLogService:
  8. @staticmethod
  9. def create_log(
  10. db: Session,
  11. log_in: LoginLogCreate
  12. ) -> LoginLog:
  13. # TODO: Implement IP to Location lookup here if needed
  14. # if log_in.ip_address and not log_in.location:
  15. # log_in.location = get_location_by_ip(log_in.ip_address)
  16. log = LoginLog(**log_in.model_dump())
  17. db.add(log)
  18. db.commit()
  19. db.refresh(log)
  20. return log
  21. @staticmethod
  22. def get_logs(
  23. db: Session,
  24. skip: int = 0,
  25. limit: int = 20,
  26. mobile: Optional[str] = None,
  27. ip_address: Optional[str] = None,
  28. status: Optional[int] = None, # 1 or 0
  29. login_method: Optional[LoginMethod] = None,
  30. auth_type: Optional[AuthType] = None,
  31. start_date: Optional[datetime] = None,
  32. end_date: Optional[datetime] = None
  33. ):
  34. query = db.query(LoginLog)
  35. if mobile:
  36. query = query.filter(LoginLog.mobile.ilike(f"%{mobile}%"))
  37. if ip_address:
  38. query = query.filter(LoginLog.ip_address.ilike(f"%{ip_address}%"))
  39. if status is not None:
  40. query = query.filter(LoginLog.is_success == status)
  41. if login_method:
  42. query = query.filter(LoginLog.login_method == login_method)
  43. if auth_type:
  44. query = query.filter(LoginLog.auth_type == auth_type)
  45. if start_date:
  46. query = query.filter(LoginLog.created_at >= start_date)
  47. if end_date:
  48. query = query.filter(LoginLog.created_at <= end_date)
  49. total = query.count()
  50. logs = query.order_by(desc(LoginLog.created_at)).offset(skip).limit(limit).all()
  51. return total, logs