| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898 |
- from typing import Optional, List
- import json
- from datetime import timedelta
- import logging
- from fastapi import APIRouter, Depends, HTTPException, Body, Request
- from fastapi.responses import RedirectResponse
- from sqlalchemy.orm import Session
- from pydantic import BaseModel
- from urllib.parse import urlencode, urlparse, parse_qs, urlunparse
- from app.api.v1 import deps
- from app.core import security
- from app.core.config import settings
- from app.core.utils import generate_english_name, get_client_ip
- from app.core.cache import redis_client
- from app.models.user import User, UserRole, UserStatus
- from app.models.application import Application, ProtocolType
- from app.models.mapping import AppUserMapping
- from app.schemas.simple_auth import (
- TicketExchangeRequest, TicketExchangeResponse,
- TicketValidateRequest, TicketValidateResponse,
- PasswordLoginRequest, PasswordLoginResponse,
- SmsLoginRequest,
- UserRegisterRequest, AdminPasswordResetRequest, AdminPasswordResetResponse,
- ChangePasswordRequest, MyMappingsResponse, UserMappingResponse,
- UserPromoteRequest, SsoLoginRequest, SsoLoginResponse,
- LaunchpadAppsResponse, LaunchpadAppResponse
- )
- from app.services.signature_service import SignatureService
- from app.services.ticket_service import TicketService
- from app.services.log_service import LogService
- from app.services.login_log_service import LoginLogService
- from app.services.system_config_service import SystemConfigService
- from app.schemas.operation_log import ActionType
- from app.schemas.login_log import LoginLogCreate, LoginMethod, AuthType
- router = APIRouter()
- logger = logging.getLogger(__name__)
- @router.post("/login", response_model=PasswordLoginResponse, summary="密码登录")
- def login_with_password(
- req: PasswordLoginRequest,
- request: Request,
- db: Session = Depends(deps.get_db),
- ):
- """
- 1. 如果提供 app_id:应用 SSO 登录,返回 ticket。
- 2. 如果未提供 app_id:统一认证平台登录,返回 access_token。
- """
-
- # --- Platform Login ---
- if not req.app_id:
- # Prepare Log
- log_create = LoginLogCreate(
- mobile=req.identifier,
- ip_address=get_client_ip(request),
- login_method=LoginMethod.UNIFIED_PAGE,
- auth_type=AuthType.PASSWORD,
- user_agent=request.headers.get("user-agent")
- )
- # Find user by mobile only
- user = db.query(User).filter(User.mobile == req.identifier, User.is_deleted == 0).first()
- if not user:
- log_create.is_success = 0
- log_create.failure_reason = "用户未找到"
- LoginLogService.create_log(db, log_create)
- logger.warning(f"平台登录失败: 用户 {req.identifier} 未找到")
- raise HTTPException(status_code=404, detail="用户未找到")
-
- log_create.user_id = user.id
- is_valid = security.verify_password(req.password, user.password_hash)
- if not is_valid:
- logger.warning(f"平台登录失败: 用户 {user.mobile} 密码错误")
-
- log_create.is_success = 0
- log_create.failure_reason = "密码错误"
- LoginLogService.create_log(db, log_create)
-
- raise HTTPException(status_code=401, detail="密码错误")
-
- if user.status != UserStatus.ACTIVE:
- logger.warning(f"平台登录失败: 用户 {user.mobile} 已被禁用")
- log_create.is_success = 0
- log_create.failure_reason = "用户已禁用"
- LoginLogService.create_log(db, log_create)
- raise HTTPException(status_code=400, detail="用户已禁用")
- # Generate JWT Access Token
- access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
- if req.remember_me:
- access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES_LONG)
- access_token = security.create_access_token(
- user.id,
- expires_delta=access_token_expires,
- is_long_term=req.remember_me
- )
-
- # Log Success
- LoginLogService.create_log(db, log_create)
- logger.info(f"平台登录成功: 用户 {user.mobile} (ID: {user.id})")
- return {
- "access_token": access_token,
- "token_type": "bearer",
- "role": user.role
- }
- # --- App SSO Login ---
- log_create = LoginLogCreate(
- mobile=req.identifier,
- ip_address=get_client_ip(request),
- login_method=LoginMethod.CUSTOM_PAGE, # 假设应用自定义页面调用此接口
- auth_type=AuthType.PASSWORD,
- user_agent=request.headers.get("user-agent")
- )
- # 1. Verify App
- app = db.query(Application).filter(Application.app_id == req.app_id).first()
- if not app:
- log_create.is_success = 0
- log_create.failure_reason = "应用未找到"
- LoginLogService.create_log(db, log_create)
- logger.warning(f"应用登录失败: 应用ID {req.app_id} 未找到")
- raise HTTPException(status_code=404, detail="应用未找到")
- # 2. Verify Signature (Optional but recommended for server-side calls)
- if req.sign and req.timestamp:
- params = {
- "app_id": req.app_id,
- "identifier": req.identifier,
- "password": req.password,
- "timestamp": req.timestamp,
- "sign": req.sign
- }
- if not SignatureService.verify_signature(app.app_secret, params, req.sign):
- log_create.is_success = 0
- log_create.failure_reason = "签名无效"
- LoginLogService.create_log(db, log_create)
- logger.warning(f"应用登录失败: 应用 {req.app_id} 签名验证失败")
- raise HTTPException(status_code=400, detail="签名无效")
- # 3. Find User
- user = None
-
- # Auto-trim password to prevent common copy-paste errors
- if req.password:
- req.password = req.password.strip()
- # Try by mobile
- user = db.query(User).filter(User.mobile == req.identifier, User.is_deleted == 0).first()
-
- if not user:
- # Try by mapping
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app.id,
- (AppUserMapping.mapped_key == req.identifier) | (AppUserMapping.mapped_email == req.identifier)
- ).first()
-
- if mapping:
- user = db.query(User).filter(User.id == mapping.user_id, User.is_deleted == 0).first()
- if not user:
- log_create.is_success = 0
- log_create.failure_reason = "用户未找到"
- LoginLogService.create_log(db, log_create)
- logger.warning(f"应用登录失败: 用户 {req.identifier} 在应用 {req.app_id} 中未找到")
- raise HTTPException(status_code=404, detail="用户未找到")
- log_create.user_id = user.id
- if user.status != UserStatus.ACTIVE:
- log_create.is_success = 0
- log_create.failure_reason = "用户已禁用"
- LoginLogService.create_log(db, log_create)
- logger.warning(f"应用登录失败: 用户 {user.mobile} 已被禁用")
- raise HTTPException(status_code=400, detail="用户已禁用")
- # 4. Verify Password
- # DEBUG: Log password verification details
- is_valid = security.verify_password(req.password, user.password_hash)
- if not is_valid:
- logger.warning(f"应用登录失败: 用户 {user.mobile} 密码验证失败 (App: {req.app_id})")
-
- log_create.is_success = 0
- log_create.failure_reason = "密码错误"
- LoginLogService.create_log(db, log_create)
-
- raise HTTPException(status_code=401, detail="密码错误")
- # 5. Generate Ticket (Self-Targeting)
- ticket = TicketService.generate_ticket(user.id, req.app_id)
-
- # Log Success (AuthType is PASSWORD leading to TICKET generation, keeping PASSWORD is fine or TICKET)
- # User requirement: "包括...认证方式". Here the auth method was PASSWORD.
- LoginLogService.create_log(db, log_create)
- logger.info(f"应用登录成功: 用户 {user.mobile} 获取 Ticket (App: {req.app_id})")
- return {"ticket": ticket}
- @router.post("/sms-login", response_model=PasswordLoginResponse, summary="短信验证码登录")
- def login_with_sms(
- req: SmsLoginRequest,
- request: Request,
- db: Session = Depends(deps.get_db),
- ):
- """
- 1. 如果提供 app_id:应用 SSO 登录,返回 ticket。
- 2. 如果未提供 app_id:统一认证平台登录,返回 access_token。
- """
-
- # 0. Check Config (Assuming PC enabled for API access, or check both)
- # Since this is an API used by external apps (likely web), we default to checking PC config
- # or we can check if EITHER is enabled.
- pc_enabled = SystemConfigService.get_config(db, "sms_login_pc_enabled")
- mobile_enabled = SystemConfigService.get_config(db, "sms_login_mobile_enabled")
-
- if pc_enabled != "true" and mobile_enabled != "true":
- logger.warning("短信登录尝试失败: 短信登录功能未开启")
- raise HTTPException(status_code=403, detail="短信登录功能未开启")
- # --- Platform Login ---
- if not req.app_id:
- # Prepare Log
- log_create = LoginLogCreate(
- mobile=req.mobile,
- ip_address=get_client_ip(request),
- login_method=LoginMethod.UNIFIED_PAGE,
- auth_type=AuthType.SMS,
- user_agent=request.headers.get("user-agent")
- )
- # 1. Verify Code
- key = f"SMS:{req.mobile}"
- stored_code = redis_client.get(key)
-
- if not stored_code or stored_code != req.code:
- log_create.is_success = 0
- log_create.failure_reason = "验证码错误或已过期"
- LoginLogService.create_log(db, log_create)
- logger.warning(f"平台短信登录失败: 手机号 {req.mobile} 验证码无效")
- raise HTTPException(status_code=400, detail="验证码错误或已过期")
- # 2. Find user
- user = db.query(User).filter(User.mobile == req.mobile, User.is_deleted == 0).first()
- if not user:
- log_create.is_success = 0
- log_create.failure_reason = "用户未找到"
- LoginLogService.create_log(db, log_create)
- logger.warning(f"平台短信登录失败: 手机号 {req.mobile} 未注册")
- raise HTTPException(status_code=404, detail="用户未找到")
-
- log_create.user_id = user.id
- if user.status != UserStatus.ACTIVE:
- log_create.is_success = 0
- log_create.failure_reason = "用户已禁用"
- LoginLogService.create_log(db, log_create)
- logger.warning(f"平台短信登录失败: 用户 {user.mobile} 已被禁用")
- raise HTTPException(status_code=400, detail="用户已禁用")
- # 3. Generate JWT Access Token
- access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
- access_token = security.create_access_token(
- user.id,
- expires_delta=access_token_expires
- )
-
- # Clear Code
- redis_client.delete(key)
-
- # Log Success
- LoginLogService.create_log(db, log_create)
- logger.info(f"平台短信登录成功: 用户 {user.mobile} (ID: {user.id})")
- return {
- "access_token": access_token,
- "token_type": "bearer",
- "role": user.role
- }
- # --- App SSO Login ---
- log_create = LoginLogCreate(
- mobile=req.mobile,
- ip_address=get_client_ip(request),
- login_method=LoginMethod.CUSTOM_PAGE,
- auth_type=AuthType.SMS,
- user_agent=request.headers.get("user-agent")
- )
- # 1. Verify App
- app = db.query(Application).filter(Application.app_id == req.app_id).first()
- if not app:
- log_create.is_success = 0
- log_create.failure_reason = "应用未找到"
- LoginLogService.create_log(db, log_create)
- logger.warning(f"应用短信登录失败: 应用ID {req.app_id} 未找到")
- raise HTTPException(status_code=404, detail="应用未找到")
- # 2. Verify Signature (Optional)
- if req.sign and req.timestamp:
- params = {
- "app_id": req.app_id,
- "mobile": req.mobile,
- "code": req.code,
- "timestamp": req.timestamp,
- "sign": req.sign
- }
- if not SignatureService.verify_signature(app.app_secret, params, req.sign):
- log_create.is_success = 0
- log_create.failure_reason = "签名无效"
- LoginLogService.create_log(db, log_create)
- logger.warning(f"应用短信登录失败: 应用 {req.app_id} 签名无效")
- raise HTTPException(status_code=400, detail="签名无效")
- # 3. Verify Code
- key = f"SMS:{req.mobile}"
- stored_code = redis_client.get(key)
-
- if not stored_code or stored_code != req.code:
- log_create.is_success = 0
- log_create.failure_reason = "验证码错误或已过期"
- LoginLogService.create_log(db, log_create)
- logger.warning(f"应用短信登录失败: 手机号 {req.mobile} 验证码无效")
- raise HTTPException(status_code=400, detail="验证码错误或已过期")
- # 4. Find User
- user = db.query(User).filter(User.mobile == req.mobile, User.is_deleted == 0).first()
-
- if not user:
- log_create.is_success = 0
- log_create.failure_reason = "用户未找到"
- LoginLogService.create_log(db, log_create)
- logger.warning(f"应用短信登录失败: 手机号 {req.mobile} 未注册")
- raise HTTPException(status_code=404, detail="用户未找到")
- log_create.user_id = user.id
- if user.status != UserStatus.ACTIVE:
- log_create.is_success = 0
- log_create.failure_reason = "用户已禁用"
- LoginLogService.create_log(db, log_create)
- logger.warning(f"应用短信登录失败: 用户 {user.mobile} 已被禁用")
- raise HTTPException(status_code=400, detail="用户已禁用")
- # 5. Generate Ticket (Self-Targeting)
- ticket = TicketService.generate_ticket(user.id, req.app_id)
-
- # Clear Code
- redis_client.delete(key)
-
- # Log Success
- LoginLogService.create_log(db, log_create)
- logger.info(f"应用短信登录成功: 用户 {user.mobile} 获取 Ticket (App: {req.app_id})")
- return {"ticket": ticket}
- @router.post("/register", response_model=PasswordLoginResponse, summary="用户注册")
- def register_user(
- req: UserRegisterRequest,
- db: Session = Depends(deps.get_db),
- ):
- """
- 注册新用户 (默认为普通用户)。
- """
- # Force role to ORDINARY_USER
- role = UserRole.ORDINARY_USER
- # Auto-login after registration (return token)
- if req.password:
- req.password = req.password.strip()
- if not security.validate_password_strength(req.password):
- raise HTTPException(status_code=400, detail="密码强度不足,必须包含字母和数字")
- existing_user = db.query(User).filter(User.mobile == req.mobile, User.is_deleted == 0).first()
- if existing_user:
- logger.info(f"用户注册失败: 手机号 {req.mobile} 已存在")
- raise HTTPException(status_code=400, detail="手机号已注册")
- english_name = generate_english_name(req.name)
- new_user = User(
- mobile=req.mobile,
- name=req.name,
- english_name=english_name,
- password_hash=security.get_password_hash(req.password),
- status=UserStatus.ACTIVE,
- role=role
- )
- db.add(new_user)
- db.commit()
- db.refresh(new_user)
-
- logger.info(f"用户注册成功: {req.mobile} (ID: {new_user.id})")
- # Auto-login after registration (return token)
- access_token = security.create_access_token(new_user.id)
- return {
- "access_token": access_token,
- "token_type": "bearer",
- "role": new_user.role
- }
- @router.post("/admin/reset-password", response_model=AdminPasswordResetResponse, summary="管理员重置密码")
- def admin_reset_password(
- req: AdminPasswordResetRequest,
- request: Request,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 超级管理员重置用户密码。
- 随机生成8位密码,只显示一次。
- """
- if current_user.role != UserRole.SUPER_ADMIN:
- raise HTTPException(status_code=403, detail="权限不足")
- # Verify Admin Password
- if not security.verify_password(req.admin_password, current_user.password_hash):
- logger.warning(f"管理员重置密码失败: 管理员 {current_user.mobile} 密码验证错误")
- raise HTTPException(status_code=401, detail="管理员密码错误")
- target_user = db.query(User).filter(User.id == req.user_id).first()
- if not target_user:
- raise HTTPException(status_code=404, detail="用户未找到")
- # Generate random password (alphanumeric only)
- new_pwd = security.generate_alphanumeric_password(8)
- target_user.password_hash = security.get_password_hash(new_pwd)
- db.add(target_user)
- db.commit()
-
- # Log Operation
- LogService.create_log(
- db=db,
- operator_id=current_user.id,
- action_type=ActionType.RESET_PASSWORD,
- target_user_id=target_user.id,
- target_mobile=target_user.mobile,
- ip_address=get_client_ip(request),
- details={}
- )
-
- logger.info(f"管理员重置用户密码成功: 目标用户 {target_user.mobile} (ID: {target_user.id})")
- return {"new_password": new_pwd}
- @router.post("/admin/promote", summary="提升用户角色")
- def promote_user(
- req: UserPromoteRequest,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- if current_user.role != UserRole.SUPER_ADMIN:
- raise HTTPException(status_code=403, detail="权限不足")
-
- if req.new_role not in [UserRole.SUPER_ADMIN, UserRole.DEVELOPER]:
- raise HTTPException(status_code=400, detail="只能提升为管理员 or 开发者")
- target_user = db.query(User).filter(User.id == req.user_id).first()
- if not target_user:
- raise HTTPException(status_code=404, detail="用户未找到")
-
- old_role = target_user.role
- target_user.role = req.new_role
- db.add(target_user)
- db.commit()
-
- logger.info(f"用户角色变更: 用户 {target_user.mobile} 从 {old_role} 变更为 {req.new_role} (操作者: {current_user.mobile})")
-
- return {"message": "success"}
- @router.get("/me/mappings", response_model=MyMappingsResponse, summary="我的映射")
- def get_my_mappings(
- skip: int = 0,
- limit: int = 10,
- app_name: str = None,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- query = db.query(AppUserMapping).join(Application).filter(AppUserMapping.user_id == current_user.id)
-
- if app_name:
- query = query.filter(Application.app_name.ilike(f"%{app_name}%"))
-
- total = query.count()
- mappings = query.order_by(AppUserMapping.id.desc()).offset(skip).limit(limit).all()
-
- result = []
- for m in mappings:
- result.append(UserMappingResponse(
- app_name=m.application.app_name if m.application else "Unknown",
- app_id=m.application.app_id if m.application else "",
- protocol_type=m.application.protocol_type if m.application else "",
- mapped_key=m.mapped_key,
- mapped_email=m.mapped_email,
- is_active=m.is_active
- ))
-
- return {"total": total, "items": result}
- @router.get("/me/launchpad-apps", response_model=LaunchpadAppsResponse, summary="快捷导航应用列表")
- def get_launchpad_apps(
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取当前用户的快捷导航应用列表(包含分类和描述)
- 仅返回已激活且协议类型为 SIMPLE_API 或 OIDC 的应用
- """
- from app.models.app_category import AppCategory
-
- # 查询用户的应用映射,join Application 和 AppCategory
- query = (
- db.query(AppUserMapping)
- .join(Application, AppUserMapping.app_id == Application.id)
- .outerjoin(AppCategory, Application.category_id == AppCategory.id)
- .filter(
- AppUserMapping.user_id == current_user.id,
- AppUserMapping.is_active == True,
- Application.protocol_type.in_([ProtocolType.SIMPLE_API, ProtocolType.OIDC]),
- Application.is_deleted == False
- )
- )
-
- mappings = query.order_by(Application.category_id.asc(), Application.app_name.asc()).all()
-
- result = []
- for m in mappings:
- app = m.application
- result.append(LaunchpadAppResponse(
- app_name=app.app_name if app else "Unknown",
- app_id=app.app_id if app else "",
- protocol_type=app.protocol_type.value if app else "",
- mapped_key=m.mapped_key,
- mapped_email=m.mapped_email,
- is_active=m.is_active,
- description=app.description if app else None,
- category_id=app.category_id if app else None,
- category_name=app.category.name if app and app.category else None
- ))
-
- return {"total": len(result), "items": result}
- @router.post("/me/change-password", summary="修改密码")
- def change_my_password(
- req: ChangePasswordRequest,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- if not security.verify_password(req.old_password, current_user.password_hash):
- logger.warning(f"用户修改密码失败: 用户 {current_user.mobile} 旧密码验证错误")
- raise HTTPException(status_code=400, detail="旧密码错误")
-
- if req.new_password:
- req.new_password = req.new_password.strip()
- if not security.validate_password_strength(req.new_password):
- raise HTTPException(status_code=400, detail="密码强度不足,必须包含字母和数字")
- current_user.password_hash = security.get_password_hash(req.new_password)
- db.add(current_user)
- db.commit()
-
- logger.info(f"用户修改密码成功: {current_user.mobile}")
-
- return {"message": "密码修改成功"}
- @router.post("/exchange", response_model=TicketExchangeResponse, summary="票据交换")
- def exchange_ticket(
- req: TicketExchangeRequest,
- db: Session = Depends(deps.get_db),
- ):
- """
- 源应用调用以获取目标应用的票据。
- """
- # 1. Verify Source App
- source_app = db.query(Application).filter(Application.app_id == req.app_id).first()
- if not source_app:
- logger.warning(f"票据交换失败: 源应用 {req.app_id} 未找到")
- raise HTTPException(status_code=404, detail="源应用未找到")
-
- # 2. Verify Signature
- params = {
- "app_id": req.app_id,
- "target_app_id": req.target_app_id,
- "user_mobile": req.user_mobile,
- "timestamp": req.timestamp,
- "sign": req.sign
- }
-
- # Use the stored secret to verify
- if not SignatureService.verify_signature(source_app.app_secret, params, req.sign):
- logger.warning(f"票据交换失败: 源应用 {req.app_id} 签名无效")
- raise HTTPException(status_code=400, detail="签名无效")
-
- # 3. Verify User Existence (Optional: Do we trust source app completely? Usually yes if signed.)
- # But we need user_id to generate ticket.
- # We query by mobile.
- user = db.query(User).filter(User.mobile == req.user_mobile, User.is_deleted == 0).first()
- if not user:
- # If user doesn't exist, we might auto-create OR fail.
- # Requirement: "Returns redirect_url".
- # For simplicity, if user not found, we cannot map.
- logger.warning(f"票据交换失败: 用户 {req.user_mobile} 未找到")
- raise HTTPException(status_code=404, detail="用户在 UAP 中未找到")
- # 4. Generate Ticket for Target App
- # Logic: The ticket allows the user to log in to Target App.
- ticket = TicketService.generate_ticket(user.id, req.target_app_id)
-
- # 5. Get Target App URL
- target_app = db.query(Application).filter(Application.app_id == req.target_app_id).first()
- if not target_app:
- logger.warning(f"票据交换失败: 目标应用 {req.target_app_id} 未找到")
- raise HTTPException(status_code=404, detail="目标应用未找到")
- # Construct redirect URL
- # Assuming target app handles /callback?ticket=...
- # We use the first redirect_uri or notification_url or custom logic.
- # Simplicity: We return the ticket and let the Source App handle the redirect,
- # OR we return a full redirect URL if target_app has a base URL configured.
- # Let's assume redirect_uris is a JSON list.
- redirect_base = ""
- if target_app.redirect_uris:
- try:
- # 尝试作为 JSON 数组解析
- uris = json.loads(target_app.redirect_uris)
- if isinstance(uris, list) and len(uris) > 0:
- redirect_base = uris[0]
- elif isinstance(uris, str):
- redirect_base = uris
- except (json.JSONDecodeError, TypeError):
- # 如果不是 JSON 格式,直接作为字符串使用
- redirect_base = target_app.redirect_uris.strip()
-
- if not redirect_base:
- # Fallback or error
- redirect_base = "http://unknown-target-url"
- full_redirect_url = f"{redirect_base}?ticket={ticket}"
- logger.info(f"票据交换成功: 用户 {req.user_mobile} 从 {req.app_id} -> {req.target_app_id}")
- return {
- "ticket": ticket,
- "redirect_url": full_redirect_url
- }
- @router.post("/sso-login", response_model=SsoLoginResponse, summary="SSO 登录")
- def sso_login(
- req: SsoLoginRequest,
- request: Request,
- db: Session = Depends(deps.get_db),
- current_user: Optional[User] = Depends(deps.get_current_active_user_optional),
- ):
- """
- SSO 登录入口,支持:
- - SIMPLE_API 应用:返回带有 Ticket 的业务系统重定向 URL
- - OIDC 应用:直接返回回调地址
- 前端只需要拿到 redirect_url 后跳转即可。
- """
- # 1. Verify App
- app = db.query(Application).filter(Application.app_id == req.app_id).first()
-
- # Prepare Log
- log_create = LoginLogCreate(
- ip_address=get_client_ip(request),
- login_method=LoginMethod.DIRECT_JUMP,
- auth_type=AuthType.SSO,
- user_agent=request.headers.get("user-agent"),
- mobile=req.username
- )
-
- if not app:
- log_create.is_success = 0
- log_create.failure_reason = "应用未找到"
- LoginLogService.create_log(db, log_create)
- logger.warning(f"SSO登录失败: 应用 {req.app_id} 未找到")
- raise HTTPException(status_code=404, detail="应用未找到")
- # 仅支持 SIMPLE_API 与 OIDC,其它协议直接拒绝
- if app.protocol_type not in ("SIMPLE_API", "OIDC"):
- log_create.is_success = 0
- log_create.failure_reason = "协议不支持"
- LoginLogService.create_log(db, log_create)
- logger.warning(
- f"SSO登录失败: 应用 {req.app_id} 协议类型不支持 ({app.protocol_type})"
- )
- raise HTTPException(status_code=400, detail="协议类型不支持该 SSO 登录方式")
- user = None
-
- # 2. Try Session Login first
- if current_user:
- user = current_user
- log_create.user_id = user.id
- log_create.mobile = user.mobile
- log_create.auth_type = AuthType.TOKEN # Used existing session
-
- # 3. If no session, try Credentials Login
- if not user and req.username and req.password:
- log_create.auth_type = AuthType.PASSWORD
- # Verify User Credentials
- user_query = db.query(User).filter(User.mobile == req.username, User.is_deleted == 0).first()
- if not user_query:
- # Check mapping
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app.id,
- (AppUserMapping.mapped_key == req.username) | (AppUserMapping.mapped_email == req.username)
- ).first()
- if mapping:
- user_query = db.query(User).filter(User.id == mapping.user_id, User.is_deleted == 0).first()
-
- if user_query and security.verify_password(req.password, user_query.password_hash):
- user = user_query
- log_create.user_id = user.id
-
- if not user:
- log_create.is_success = 0
- log_create.failure_reason = "认证失败"
- LoginLogService.create_log(db, log_create)
- logger.warning(f"SSO登录失败: 用户认证失败 (Username: {req.username})")
- raise HTTPException(status_code=401, detail="认证失败")
-
- if user.status != "ACTIVE":
- log_create.is_success = 0
- log_create.failure_reason = "用户已禁用"
- LoginLogService.create_log(db, log_create)
- logger.warning(f"SSO登录失败: 用户 {user.mobile} 已被禁用")
- raise HTTPException(status_code=400, detail="用户已禁用")
- # 4. 解析重定向基础地址(SIMPLE_API 与 OIDC 都会用到)
- redirect_base = ""
- if app.redirect_uris:
- try:
- # 尝试作为 JSON 数组解析
- uris = json.loads(app.redirect_uris)
- if isinstance(uris, list) and len(uris) > 0:
- redirect_base = uris[0]
- elif isinstance(uris, str):
- redirect_base = uris
- except (json.JSONDecodeError, TypeError):
- # 如果不是 JSON 格式,直接作为字符串使用
- redirect_base = app.redirect_uris.strip()
-
- if not redirect_base:
- logger.error(f"SSO登录配置错误: 应用 {req.app_id} 未配置回调地址")
- raise HTTPException(status_code=400, detail="应用未配置重定向 URI")
- # 5. 根据协议类型构造最终 redirect_url
- if app.protocol_type == "SIMPLE_API":
- # 5.1 SIMPLE_API: 生成 Ticket,拼接到业务系统回调地址上
- ticket = TicketService.generate_ticket(user.id, req.app_id)
- LoginLogService.create_log(db, log_create)
- logger.info(f"SSO登录成功: 用户 {user.mobile} 获取 Ticket (App: {req.app_id})")
- full_redirect_url = f"{redirect_base}?ticket={ticket}"
- return {"redirect_url": full_redirect_url}
- if app.protocol_type == "OIDC":
- # 5.2 OIDC: 直接跳转到回调地址(只保留到端口为止,去掉路径部分)
- # 例如:https://api.hnyunzhu.com:9003/oauth_callback -> https://api.hnyunzhu.com:9003
- parsed_uri = urlparse(redirect_base)
- # 只保留 scheme 和 netloc(包含端口),去掉 path、params、query、fragment
- redirect_url = f"{parsed_uri.scheme}://{parsed_uri.netloc}"
-
- LoginLogService.create_log(db, log_create)
- logger.info(
- f"OIDC SSO 登录成功: 用户 {user.mobile} 将跳转到回调地址 (App: {req.app_id}, URL: {redirect_url})"
- )
- return {"redirect_url": redirect_url}
- # 理论上不会走到这里,防御性返回
- logger.error(
- f"SSO登录异常: 未处理的协议类型 {app.protocol_type} (App: {req.app_id})"
- )
- raise HTTPException(status_code=500, detail="未处理的协议类型")
- @router.post("/validate", response_model=TicketValidateResponse, summary="验证票据")
- def validate_ticket(
- req: TicketValidateRequest,
- db: Session = Depends(deps.get_db),
- ):
- """
- 目标应用调用以消费票据。
- """
- # 1. Verify App
- app = db.query(Application).filter(Application.app_id == req.app_id).first()
- if not app:
- logger.warning(f"票据验证失败: 应用 {req.app_id} 未找到")
- raise HTTPException(status_code=404, detail="应用未找到")
- # 2. Verify Signature
- params = {
- "ticket": req.ticket,
- "app_id": req.app_id,
- "timestamp": req.timestamp,
- "sign": req.sign
- }
- if not SignatureService.verify_signature(app.app_secret, params, req.sign):
- logger.warning(f"票据验证失败: 应用 {req.app_id} 签名无效")
- raise HTTPException(status_code=400, detail="签名无效")
- # 3. Consume Ticket
- ticket_data = TicketService.consume_ticket(req.ticket, req.app_id)
-
- if not ticket_data:
- logger.warning(f"票据验证失败: Ticket 无效或已过期 (App: {req.app_id})")
- return {"valid": False}
- user_id = ticket_data["user_id"]
-
- # 4. Get User Info & Mapping
- user = db.query(User).filter(User.id == user_id).first()
-
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app.id,
- AppUserMapping.user_id == user_id
- ).first()
-
- mapped_key = mapping.mapped_key if mapping else None
- mapped_email = mapping.mapped_email if mapping else None
-
- logger.info(f"票据验证成功: 用户 {user.mobile} (App: {req.app_id})")
- return {
- "valid": True,
- "user_id": user.id,
- "mobile": user.mobile,
- "mapped_key": mapped_key,
- "mapped_email": mapped_email
- }
- @router.get("/sso/jump", summary="通知跳转 SSO")
- def sso_jump(
- app_id: str, # 应用 ID
- redirect_to: str, # 最终目标页面
- request: Request,
- db: Session = Depends(deps.get_db),
- current_user: Optional[User] = Depends(deps.get_current_active_user_optional),
- ):
- """
- 用于消息通知的 SSO 跳转接口。
- """
-
- # 1. 检查应用是否存在
- app = db.query(Application).filter(Application.app_id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
- # 2. 检查用户是否登录
- if not current_user:
- # 未登录 -> 跳转到统一登录页
- # 假设前端部署在 HTTP_REFERER 或配置的 FRONTEND_HOST (暂用相对路径)
- login_page = "/login"
- params = {"redirect": str(request.url)}
- return RedirectResponse(f"{login_page}?{urlencode(params)}")
- # 3. 用户已登录 -> 生成 Ticket
- ticket = TicketService.generate_ticket(current_user.id, app_id)
- # 4. 获取应用回调地址
- redirect_base = ""
- if app.redirect_uris:
- try:
- uris = json.loads(app.redirect_uris)
- if isinstance(uris, list) and len(uris) > 0:
- redirect_base = uris[0]
- elif isinstance(uris, str):
- redirect_base = uris
- except:
- redirect_base = app.redirect_uris.strip()
- if not redirect_base:
- raise HTTPException(status_code=400, detail="应用未配置回调地址")
- # 5. 构造最终跳转 URL
- parsed_uri = urlparse(redirect_base)
- query_params = parse_qs(parsed_uri.query)
-
- query_params['ticket'] = [ticket]
- query_params['next'] = [redirect_to]
-
- new_query = urlencode(query_params, doseq=True)
- full_redirect_url = urlunparse((
- parsed_uri.scheme,
- parsed_uri.netloc,
- parsed_uri.path,
- parsed_uri.params,
- new_query,
- parsed_uri.fragment
- ))
-
- return RedirectResponse(full_redirect_url)
|