| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- from datetime import timedelta
- from typing import Any
- import uuid
- import logging
- import urllib.parse
- from fastapi import APIRouter, Depends, HTTPException, status
- from fastapi.responses import RedirectResponse
- from fastapi.security import OAuth2PasswordRequestForm
- from sqlalchemy.orm import Session
- from backend.app.core import security
- from backend.app.core.config import settings
- from backend.app.core.database import get_db
- from backend.app.models import sql_models
- from backend.app.schemas import schemas
- from backend.app.services.simple_auth import SimpleAuthService
- logger = logging.getLogger(__name__)
- router = APIRouter()
- @router.post("/login", response_model=schemas.Token)
- def login_access_token(
- db: Session = Depends(get_db),
- form_data: OAuth2PasswordRequestForm = Depends()
- ) -> Any:
- user = db.query(sql_models.User).filter(sql_models.User.username == form_data.username).first()
- if not user or not security.verify_password(form_data.password, user.hashed_password):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Incorrect username or password",
- )
-
- access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
- access_token = security.create_access_token(
- data={"sub": user.username}, expires_delta=access_token_expires
- )
- return {
- "access_token": access_token,
- "token_type": "bearer",
- "is_superuser": user.is_superuser if hasattr(user, "is_superuser") else False,
- "username": user.username
- }
- @router.get("/auth/simple-callback")
- async def simple_auth_callback(
- ticket: str,
- db: Session = Depends(get_db)
- ) -> Any:
- """
- Callback for Simple Auth Platform.
- Receives ticket (GET param), validates it, logs in/creates local user,
- and redirects to frontend with token.
- """
- logger.info(f"Received simple-callback with ticket: {ticket}")
-
- validation_result = await SimpleAuthService.validate_ticket(ticket)
- logger.info(f"Ticket validation result: {validation_result}")
-
- if not validation_result or not validation_result.get("valid"):
- logger.error(f"Invalid ticket: {ticket}")
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="Invalid ticket"
- )
-
- # Determine local username
- # Priority: mapped_key > mobile > mapped_email > user_id
- username = validation_result.get("mapped_key")
- if not username:
- username = validation_result.get("mobile")
- if not username:
- username = validation_result.get("mapped_email")
- if not username:
- username = str(validation_result.get("user_id"))
-
- if not username:
- logger.error("Could not determine user identity from auth response")
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="Could not determine user identity from auth response"
- )
-
- logger.info(f"Identified user: {username}")
- # Check if user exists
- user = db.query(sql_models.User).filter(sql_models.User.username == username).first()
-
- if not user:
- logger.info(f"User {username} not found. Creating new user.")
- # Create new user
- # Generate a random password since they use external auth
- random_password = uuid.uuid4().hex
- hashed_password = security.get_password_hash(random_password)
-
- user = sql_models.User(
- username=username,
- hashed_password=hashed_password,
- is_active=True,
- is_superuser=False
- )
- db.add(user)
- db.commit()
- db.refresh(user)
- logger.info(f"User {username} created successfully.")
- else:
- logger.info(f"User {username} found.")
-
- # Generate token
- access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
- access_token = security.create_access_token(
- data={"sub": user.username}, expires_delta=access_token_expires
- )
-
- # Redirect to frontend root with token in query param
- # URL Encode parameters to ensure special characters (like Chinese or +) are handled correctly
- encoded_token = urllib.parse.quote(access_token)
- encoded_username = urllib.parse.quote(user.username)
- is_superuser_str = "true" if user.is_superuser else "false"
-
- redirect_url = f"/?token={encoded_token}&username={encoded_username}&superuser={is_superuser_str}"
- logger.info(f"Redirecting to frontend: {redirect_url}")
-
- return RedirectResponse(url=redirect_url)
|