from datetime import timedelta from typing import Any import uuid import logging import urllib.parse from fastapi import APIRouter, Depends, HTTPException, status from fastapi.responses import RedirectResponse from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session from backend.app.core import security from backend.app.core.config import settings from backend.app.core.database import get_db from backend.app.models import sql_models from backend.app.schemas import schemas from backend.app.services.simple_auth import SimpleAuthService logger = logging.getLogger(__name__) router = APIRouter() @router.post("/login", response_model=schemas.Token) def login_access_token( db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends() ) -> Any: user = db.query(sql_models.User).filter(sql_models.User.username == form_data.username).first() if not user or not security.verify_password(form_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", ) access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = security.create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return { "access_token": access_token, "token_type": "bearer", "is_superuser": user.is_superuser if hasattr(user, "is_superuser") else False, "username": user.username } @router.get("/auth/simple-callback") async def simple_auth_callback( ticket: str, db: Session = Depends(get_db) ) -> Any: """ Callback for Simple Auth Platform. Receives ticket (GET param), validates it, logs in/creates local user, and redirects to frontend with token. """ logger.info(f"Received simple-callback with ticket: {ticket}") validation_result = await SimpleAuthService.validate_ticket(ticket) logger.info(f"Ticket validation result: {validation_result}") if not validation_result or not validation_result.get("valid"): logger.error(f"Invalid ticket: {ticket}") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid ticket" ) # Determine local username # Priority: mapped_key > mobile > mapped_email > user_id username = validation_result.get("mapped_key") if not username: username = validation_result.get("mobile") if not username: username = validation_result.get("mapped_email") if not username: username = str(validation_result.get("user_id")) if not username: logger.error("Could not determine user identity from auth response") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Could not determine user identity from auth response" ) logger.info(f"Identified user: {username}") # Check if user exists user = db.query(sql_models.User).filter(sql_models.User.username == username).first() if not user: logger.info(f"User {username} not found. Creating new user.") # Create new user # Generate a random password since they use external auth random_password = uuid.uuid4().hex hashed_password = security.get_password_hash(random_password) user = sql_models.User( username=username, hashed_password=hashed_password, is_active=True, is_superuser=False ) db.add(user) db.commit() db.refresh(user) logger.info(f"User {username} created successfully.") else: logger.info(f"User {username} found.") # Generate token access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = security.create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) # Redirect to frontend root with token in query param # URL Encode parameters to ensure special characters (like Chinese or +) are handled correctly encoded_token = urllib.parse.quote(access_token) encoded_username = urllib.parse.quote(user.username) is_superuser_str = "true" if user.is_superuser else "false" redirect_url = f"/?token={encoded_token}&username={encoded_username}&superuser={is_superuser_str}" logger.info(f"Redirecting to frontend: {redirect_url}") return RedirectResponse(url=redirect_url)