from sqlalchemy.orm import Session from sqlalchemy import desc from typing import Optional from datetime import datetime from app.models.login_log import LoginLog, LoginMethod, AuthType from app.schemas.login_log import LoginLogCreate class LoginLogService: @staticmethod def create_log( db: Session, log_in: LoginLogCreate ) -> LoginLog: # TODO: Implement IP to Location lookup here if needed # if log_in.ip_address and not log_in.location: # log_in.location = get_location_by_ip(log_in.ip_address) log = LoginLog(**log_in.model_dump()) db.add(log) db.commit() db.refresh(log) return log @staticmethod def get_logs( db: Session, skip: int = 0, limit: int = 20, mobile: Optional[str] = None, ip_address: Optional[str] = None, status: Optional[int] = None, # 1 or 0 login_method: Optional[LoginMethod] = None, auth_type: Optional[AuthType] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None ): query = db.query(LoginLog) if mobile: query = query.filter(LoginLog.mobile.ilike(f"%{mobile}%")) if ip_address: query = query.filter(LoginLog.ip_address.ilike(f"%{ip_address}%")) if status is not None: query = query.filter(LoginLog.is_success == status) if login_method: query = query.filter(LoginLog.login_method == login_method) if auth_type: query = query.filter(LoginLog.auth_type == auth_type) if start_date: query = query.filter(LoginLog.created_at >= start_date) if end_date: query = query.filter(LoginLog.created_at <= end_date) total = query.count() logs = query.order_by(desc(LoginLog.created_at)).offset(skip).limit(limit).all() return total, logs