| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- from sqlalchemy.orm import Session
- from sqlalchemy import desc
- from typing import Optional
- from datetime import datetime
- from app.models.login_log import LoginLog, LoginMethod, AuthType
- from app.schemas.login_log import LoginLogCreate
- class LoginLogService:
- @staticmethod
- def create_log(
- db: Session,
- log_in: LoginLogCreate
- ) -> LoginLog:
- # TODO: Implement IP to Location lookup here if needed
- # if log_in.ip_address and not log_in.location:
- # log_in.location = get_location_by_ip(log_in.ip_address)
-
- log = LoginLog(**log_in.model_dump())
- db.add(log)
- db.commit()
- db.refresh(log)
- return log
- @staticmethod
- def get_logs(
- db: Session,
- skip: int = 0,
- limit: int = 20,
- mobile: Optional[str] = None,
- ip_address: Optional[str] = None,
- status: Optional[int] = None, # 1 or 0
- login_method: Optional[LoginMethod] = None,
- auth_type: Optional[AuthType] = None,
- start_date: Optional[datetime] = None,
- end_date: Optional[datetime] = None
- ):
- query = db.query(LoginLog)
-
- if mobile:
- query = query.filter(LoginLog.mobile.ilike(f"%{mobile}%"))
-
- if ip_address:
- query = query.filter(LoginLog.ip_address.ilike(f"%{ip_address}%"))
-
- if status is not None:
- query = query.filter(LoginLog.is_success == status)
-
- if login_method:
- query = query.filter(LoginLog.login_method == login_method)
-
- if auth_type:
- query = query.filter(LoginLog.auth_type == auth_type)
-
- if start_date:
- query = query.filter(LoginLog.created_at >= start_date)
-
- if end_date:
- query = query.filter(LoginLog.created_at <= end_date)
-
- total = query.count()
- logs = query.order_by(desc(LoginLog.created_at)).offset(skip).limit(limit).all()
-
- return total, logs
|