from typing import Optional, List import json from fastapi import APIRouter, Depends, HTTPException, Body from sqlalchemy.orm import Session from pydantic import BaseModel from app.api.v1 import deps from app.core import security from app.models.user import User, UserRole, UserStatus from app.models.application import Application from app.models.mapping import AppUserMapping from app.schemas.simple_auth import ( TicketExchangeRequest, TicketExchangeResponse, TicketValidateRequest, TicketValidateResponse, PasswordLoginRequest, PasswordLoginResponse, UserRegisterRequest, AdminPasswordResetRequest, AdminPasswordResetResponse, ChangePasswordRequest, MyMappingsResponse, UserMappingResponse, UserPromoteRequest, SsoLoginRequest, SsoLoginResponse ) from app.services.signature_service import SignatureService from app.services.ticket_service import TicketService router = APIRouter() @router.post("/login", response_model=PasswordLoginResponse, summary="密码登录") def login_with_password( req: PasswordLoginRequest, db: Session = Depends(deps.get_db), ): """ 1. 如果提供 app_id:应用 SSO 登录,返回 ticket。 2. 如果未提供 app_id:统一认证平台登录,返回 access_token。 """ # --- Platform Login --- if not req.app_id: # Find user by mobile only user = db.query(User).filter(User.mobile == req.identifier, User.is_deleted == 0).first() if not user: raise HTTPException(status_code=404, detail="用户未找到") if not security.verify_password(req.password, user.password_hash): raise HTTPException(status_code=401, detail="密码错误") if user.status != UserStatus.ACTIVE: raise HTTPException(status_code=400, detail="用户已禁用") # Generate JWT Access Token access_token = security.create_access_token(user.id) return { "access_token": access_token, "token_type": "bearer", "role": user.role } # --- App SSO Login --- # 1. Verify App app = db.query(Application).filter(Application.app_id == req.app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") # 2. Verify Signature (Optional but recommended for server-side calls) if req.sign and req.timestamp: params = { "app_id": req.app_id, "identifier": req.identifier, "password": req.password, "timestamp": req.timestamp, "sign": req.sign } if not SignatureService.verify_signature(app.app_secret, params, req.sign): raise HTTPException(status_code=400, detail="签名无效") # 3. Find User user = None # Try by mobile user = db.query(User).filter(User.mobile == req.identifier, User.is_deleted == 0).first() if not user: # Try by mapping mapping = db.query(AppUserMapping).filter( AppUserMapping.app_id == app.id, (AppUserMapping.mapped_key == req.identifier) | (AppUserMapping.mapped_email == req.identifier) ).first() if mapping: user = db.query(User).filter(User.id == mapping.user_id, User.is_deleted == 0).first() if not user: raise HTTPException(status_code=404, detail="用户未找到") if user.status != UserStatus.ACTIVE: raise HTTPException(status_code=400, detail="用户已禁用") # 4. Verify Password if not security.verify_password(req.password, user.password_hash): raise HTTPException(status_code=401, detail="密码错误") # 5. Generate Ticket (Self-Targeting) ticket = TicketService.generate_ticket(user.id, req.app_id) return {"ticket": ticket} @router.post("/register", response_model=PasswordLoginResponse, summary="用户注册") def register_user( req: UserRegisterRequest, db: Session = Depends(deps.get_db), ): """ 注册新用户 (默认为普通用户)。 """ # Force role to ORDINARY_USER role = UserRole.ORDINARY_USER existing_user = db.query(User).filter(User.mobile == req.mobile, User.is_deleted == 0).first() if existing_user: raise HTTPException(status_code=400, detail="手机号已注册") new_user = User( mobile=req.mobile, password_hash=security.get_password_hash(req.password), status=UserStatus.ACTIVE, role=role ) db.add(new_user) db.commit() db.refresh(new_user) # Auto-login after registration (return token) access_token = security.create_access_token(new_user.id) return { "access_token": access_token, "token_type": "bearer", "role": new_user.role } @router.post("/admin/reset-password", response_model=AdminPasswordResetResponse, summary="管理员重置密码") def admin_reset_password( req: AdminPasswordResetRequest, db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): """ 超级管理员重置用户密码。 随机生成8位密码,只显示一次。 """ if current_user.role != UserRole.SUPER_ADMIN: raise HTTPException(status_code=403, detail="权限不足") target_user = db.query(User).filter(User.id == req.user_id).first() if not target_user: raise HTTPException(status_code=404, detail="用户未找到") # Generate random password (alphanumeric only) new_pwd = security.generate_alphanumeric_password(8) target_user.password_hash = security.get_password_hash(new_pwd) db.add(target_user) db.commit() return {"new_password": new_pwd} @router.post("/admin/promote", summary="提升用户角色") def promote_user( req: UserPromoteRequest, db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): if current_user.role != UserRole.SUPER_ADMIN: raise HTTPException(status_code=403, detail="权限不足") if req.new_role not in [UserRole.SUPER_ADMIN, UserRole.DEVELOPER]: raise HTTPException(status_code=400, detail="只能提升为管理员 or 开发者") target_user = db.query(User).filter(User.id == req.user_id).first() if not target_user: raise HTTPException(status_code=404, detail="用户未找到") target_user.role = req.new_role db.add(target_user) db.commit() return {"message": "success"} @router.get("/me/mappings", response_model=MyMappingsResponse, summary="我的映射") def get_my_mappings( skip: int = 0, limit: int = 10, app_name: str = None, db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): query = db.query(AppUserMapping).join(Application).filter(AppUserMapping.user_id == current_user.id) if app_name: query = query.filter(Application.app_name.ilike(f"%{app_name}%")) total = query.count() mappings = query.order_by(AppUserMapping.id.desc()).offset(skip).limit(limit).all() result = [] for m in mappings: result.append(UserMappingResponse( app_name=m.application.app_name if m.application else "Unknown", mapped_key=m.mapped_key, mapped_email=m.mapped_email, is_active=m.is_active )) return {"total": total, "items": result} @router.post("/me/change-password", summary="修改密码") def change_my_password( req: ChangePasswordRequest, db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): if not security.verify_password(req.old_password, current_user.password_hash): raise HTTPException(status_code=400, detail="旧密码错误") current_user.password_hash = security.get_password_hash(req.new_password) db.add(current_user) db.commit() return {"message": "密码修改成功"} @router.post("/exchange", response_model=TicketExchangeResponse, summary="票据交换") def exchange_ticket( req: TicketExchangeRequest, db: Session = Depends(deps.get_db), ): """ 源应用调用以获取目标应用的票据。 """ # 1. Verify Source App source_app = db.query(Application).filter(Application.app_id == req.app_id).first() if not source_app: raise HTTPException(status_code=404, detail="源应用未找到") # 2. Verify Signature params = { "app_id": req.app_id, "target_app_id": req.target_app_id, "user_mobile": req.user_mobile, "timestamp": req.timestamp, "sign": req.sign } # Use the stored secret to verify if not SignatureService.verify_signature(source_app.app_secret, params, req.sign): raise HTTPException(status_code=400, detail="签名无效") # 3. Verify User Existence (Optional: Do we trust source app completely? Usually yes if signed.) # But we need user_id to generate ticket. # We query by mobile. user = db.query(User).filter(User.mobile == req.user_mobile, User.is_deleted == 0).first() if not user: # If user doesn't exist, we might auto-create OR fail. # Requirement: "Returns redirect_url". # For simplicity, if user not found, we cannot map. raise HTTPException(status_code=404, detail="用户在 UAP 中未找到") # 4. Generate Ticket for Target App # Logic: The ticket allows the user to log in to Target App. ticket = TicketService.generate_ticket(user.id, req.target_app_id) # 5. Get Target App URL target_app = db.query(Application).filter(Application.app_id == req.target_app_id).first() if not target_app: raise HTTPException(status_code=404, detail="目标应用未找到") # Construct redirect URL # Assuming target app handles /callback?ticket=... # We use the first redirect_uri or notification_url or custom logic. # Simplicity: We return the ticket and let the Source App handle the redirect, # OR we return a full redirect URL if target_app has a base URL configured. # Let's assume redirect_uris is a JSON list. redirect_base = "" if target_app.redirect_uris: try: uris = json.loads(target_app.redirect_uris) if uris and len(uris) > 0: redirect_base = uris[0] except: pass if not redirect_base: # Fallback or error redirect_base = "http://unknown-target-url" full_redirect_url = f"{redirect_base}?ticket={ticket}" return { "ticket": ticket, "redirect_url": full_redirect_url } @router.post("/ticket/exchange", response_model=TicketExchangeResponse, summary="票据交换 (API)") def ticket_exchange_api( req: TicketExchangeRequest, db: Session = Depends(deps.get_db), ): """ 服务器对服务器 API:源应用请求目标应用的票据。 """ return exchange_ticket(req, db) @router.post("/sso-login", response_model=SsoLoginResponse, summary="SSO 登录 (简易模式)") def sso_login( req: SsoLoginRequest, db: Session = Depends(deps.get_db), current_user: Optional[User] = Depends(deps.get_current_active_user_optional), ): """ 简易 API 应用的 SSO 登录。 返回带有票据的重定向 URL。 支持: 1. 用户名 + 密码登录 2. 基于会话的自动登录(如果已登录) """ # 1. Verify App app = db.query(Application).filter(Application.app_id == req.app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if app.protocol_type != "SIMPLE_API": raise HTTPException(status_code=400, detail="SSO 登录仅支持简易 API 应用。OIDC 请使用标准流程。") user = None # 2. Try Session Login first if current_user: user = current_user # 3. If no session, try Credentials Login if not user and req.username and req.password: # Verify User Credentials user_query = db.query(User).filter(User.mobile == req.username, User.is_deleted == 0).first() if not user_query: # Check mapping mapping = db.query(AppUserMapping).filter( AppUserMapping.app_id == app.id, (AppUserMapping.mapped_key == req.username) | (AppUserMapping.mapped_email == req.username) ).first() if mapping: user_query = db.query(User).filter(User.id == mapping.user_id, User.is_deleted == 0).first() if user_query and security.verify_password(req.password, user_query.password_hash): user = user_query if not user: raise HTTPException(status_code=401, detail="认证失败") if user.status != "ACTIVE": raise HTTPException(status_code=400, detail="用户已禁用") # 4. Generate Ticket ticket = TicketService.generate_ticket(user.id, req.app_id) # 5. Get Redirect URL redirect_base = "" if app.redirect_uris: try: uris = json.loads(app.redirect_uris) if uris and len(uris) > 0: redirect_base = uris[0] except: pass if not redirect_base: raise HTTPException(status_code=400, detail="应用未配置重定向 URI") full_redirect_url = f"{redirect_base}?ticket={ticket}" return {"redirect_url": full_redirect_url} @router.post("/validate", response_model=TicketValidateResponse, summary="验证票据") def validate_ticket( req: TicketValidateRequest, db: Session = Depends(deps.get_db), ): """ 目标应用调用以消费票据。 """ # 1. Verify App app = db.query(Application).filter(Application.app_id == req.app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") # 2. Verify Signature params = { "ticket": req.ticket, "app_id": req.app_id, "timestamp": req.timestamp, "sign": req.sign } if not SignatureService.verify_signature(app.app_secret, params, req.sign): raise HTTPException(status_code=400, detail="签名无效") # 3. Consume Ticket ticket_data = TicketService.consume_ticket(req.ticket, req.app_id) if not ticket_data: return {"valid": False} user_id = ticket_data["user_id"] # 4. Get User Info & Mapping user = db.query(User).filter(User.id == user_id).first() mapping = db.query(AppUserMapping).filter( AppUserMapping.app_id == app.id, AppUserMapping.user_id == user_id ).first() mapped_key = mapping.mapped_key if mapping else None mapped_email = mapping.mapped_email if mapping else None return { "valid": True, "user_id": user.id, "mobile": user.mobile, "mapped_key": mapped_key, "mapped_email": mapped_email }