import secrets import string import io import csv import pandas as pd from typing import List from fastapi import APIRouter, Depends, HTTPException, Response from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session from sqlalchemy import desc from app.api.v1 import deps from app.core import security from app.models.application import Application from app.models.user import User from app.models.mapping import AppUserMapping from app.schemas.application import ( ApplicationCreate, ApplicationUpdate, ApplicationResponse, ApplicationList, ApplicationSecretDisplay, ViewSecretRequest ) from app.schemas.mapping import ( MappingList, MappingResponse, MappingCreate, MappingUpdate, MappingDelete, MappingPreviewResponse, MappingImportSummary, MappingStrategy ) from app.schemas.user import UserSyncRequest from app.services.mapping_service import MappingService router = APIRouter() def generate_access_token(): return secrets.token_urlsafe(32) def generate_app_credentials(): # Generate a random 16-char App ID (hex or alphanumeric) app_id = "app_" + secrets.token_hex(8) # Generate a strong 32-char App Secret alphabet = string.ascii_letters + string.digits app_secret = ''.join(secrets.choice(alphabet) for i in range(32)) return app_id, app_secret @router.get("/", response_model=ApplicationList, summary="获取应用列表") def read_apps( skip: int = 0, limit: int = 10, search: str = None, db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): """ 获取应用列表(分页)。 超级管理员可以查看所有,开发者只能查看自己的应用。 """ query = db.query(Application).filter(Application.is_deleted == False) if current_user.role != "SUPER_ADMIN": query = query.filter(Application.owner_id == current_user.id) if search: # Search by name or app_id from sqlalchemy import or_ query = query.filter( or_( Application.app_name.ilike(f"%{search}%"), Application.app_id.ilike(f"%{search}%") ) ) total = query.count() apps = query.order_by(desc(Application.id)).offset(skip).limit(limit).all() return {"total": total, "items": apps} @router.post("/", response_model=ApplicationSecretDisplay, summary="创建应用") def create_app( *, db: Session = Depends(deps.get_db), app_in: ApplicationCreate, current_user: User = Depends(deps.get_current_active_user), ): """ 创建新应用。只会返回一次明文密钥。 """ # 1. Generate ID and Secret app_id, app_secret = generate_app_credentials() # 2. Generate Access Token access_token = generate_access_token() # 3. Store Secret (Plain text needed for HMAC verification) db_app = Application( app_id=app_id, app_secret=app_secret, access_token=access_token, app_name=app_in.app_name, icon_url=app_in.icon_url, protocol_type=app_in.protocol_type, redirect_uris=app_in.redirect_uris, notification_url=app_in.notification_url, owner_id=current_user.id # Assign owner ) db.add(db_app) db.commit() db.refresh(db_app) return ApplicationSecretDisplay(app_id=app_id, app_secret=app_secret, access_token=access_token) @router.put("/{app_id}", response_model=ApplicationResponse, summary="更新应用") def update_app( *, db: Session = Depends(deps.get_db), app_id: int, app_in: ApplicationUpdate, current_user: User = Depends(deps.get_current_active_user), ): """ 更新应用信息。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") # Check ownership if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") update_data = app_in.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(app, field, value) db.add(app) db.commit() db.refresh(app) return app @router.delete("/{app_id}", response_model=ApplicationResponse, summary="删除应用") def delete_app( *, db: Session = Depends(deps.get_db), app_id: int, current_user: User = Depends(deps.get_current_active_user), ): """ 软删除应用。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") # Check ownership if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") app.is_deleted = True db.add(app) db.commit() return app @router.post("/{app_id}/regenerate-secret", response_model=ApplicationSecretDisplay, summary="重新生成密钥") def regenerate_secret( *, db: Session = Depends(deps.get_db), app_id: int, current_user: User = Depends(deps.get_current_active_user), ): """ 重新生成应用密钥。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") # Check ownership if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") _, new_secret = generate_app_credentials() app.app_secret = new_secret db.add(app) db.commit() return ApplicationSecretDisplay(app_id=app.app_id, app_secret=new_secret, access_token=app.access_token) @router.post("/{app_id}/view-secret", response_model=ApplicationSecretDisplay, summary="查看密钥") def view_secret( *, db: Session = Depends(deps.get_db), app_id: int, req: ViewSecretRequest, current_user: User = Depends(deps.get_current_active_user), ): """ 查看应用密钥。需要验证用户密码。 """ # 1. Verify Password if not security.verify_password(req.password, current_user.password_hash): raise HTTPException(status_code=401, detail="密码错误") app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") # Check ownership if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") return ApplicationSecretDisplay(app_id=app.app_id, app_secret=app.app_secret, access_token=app.access_token) # ========================================== # Mappings # ========================================== @router.get("/{app_id}/mappings", response_model=MappingList, summary="获取应用映射列表") def read_mappings( *, db: Session = Depends(deps.get_db), app_id: int, skip: int = 0, limit: int = 10, current_user: User = Depends(deps.get_current_active_user), ): """ 获取应用的账号映射列表。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") query = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id) total = query.count() mappings = query.order_by(desc(AppUserMapping.id)).offset(skip).limit(limit).all() # Enrich with user mobile (handled by ORM relation usually, but for Pydantic 'from_attributes') # We added `user_mobile` to MappingResponse, so we need to ensure it's populated. # The ORM `mapping.user` is lazy loaded, which is fine for sync code. result = [] for m in mappings: result.append(MappingResponse( id=m.id, app_id=m.app_id, user_id=m.user_id, mapped_key=m.mapped_key, mapped_email=m.mapped_email, user_mobile=m.user.mobile if m.user else "Deleted User" )) return {"total": total, "items": result} @router.post("/{app_id}/mappings", response_model=MappingResponse, summary="创建映射") def create_mapping( *, db: Session = Depends(deps.get_db), app_id: int, mapping_in: MappingCreate, current_user: User = Depends(deps.get_current_active_user), ): """ 手动创建映射。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") # Verify Password if not security.verify_password(mapping_in.password, current_user.password_hash): raise HTTPException(status_code=401, detail="密码错误") # Normalize input: treat empty strings as None to avoid unique constraint violations mapped_key = mapping_in.mapped_key if mapping_in.mapped_key else None mapped_email = mapping_in.mapped_email if mapping_in.mapped_email else None # 1. Find User or Create user = db.query(User).filter(User.mobile == mapping_in.mobile, User.is_deleted == 0).first() new_user_created = False generated_password = None if not user: # Auto create user password_plain = security.generate_alphanumeric_password(8) # Random password letters+digits user = User( mobile=mapping_in.mobile, password_hash=security.get_password_hash(password_plain), status="ACTIVE", role="ORDINARY_USER" ) db.add(user) db.commit() db.refresh(user) new_user_created = True generated_password = password_plain # 2. Check if mapping exists existing = db.query(AppUserMapping).filter( AppUserMapping.app_id == app_id, AppUserMapping.user_id == user.id ).first() if existing: raise HTTPException(status_code=400, detail="该用户的映射已存在") # 3. Check Uniqueness for mapped_email (if provided) if mapped_email: email_exists = db.query(AppUserMapping).filter( AppUserMapping.app_id == app_id, AppUserMapping.mapped_email == mapped_email ).first() if email_exists: raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapped_email} 已被使用") # 4. Check Uniqueness for mapped_key if mapped_key: key_exists = db.query(AppUserMapping).filter( AppUserMapping.app_id == app_id, AppUserMapping.mapped_key == mapped_key ).first() if key_exists: raise HTTPException(status_code=400, detail=f"该应用下账号 {mapped_key} 已被使用") # 5. Create mapping = AppUserMapping( app_id=app_id, user_id=user.id, mapped_key=mapped_key, mapped_email=mapped_email ) db.add(mapping) db.commit() db.refresh(mapping) return MappingResponse( id=mapping.id, app_id=mapping.app_id, user_id=mapping.user_id, mapped_key=mapping.mapped_key, mapped_email=mapping.mapped_email, user_mobile=user.mobile, new_user_created=new_user_created, generated_password=generated_password ) @router.put("/{app_id}/mappings/{mapping_id}", response_model=MappingResponse, summary="更新映射") def update_mapping( *, db: Session = Depends(deps.get_db), app_id: int, mapping_id: int, mapping_in: MappingUpdate, current_user: User = Depends(deps.get_current_active_user), ): """ 更新映射信息。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") # Verify Password if not security.verify_password(mapping_in.password, current_user.password_hash): raise HTTPException(status_code=401, detail="密码错误") mapping = db.query(AppUserMapping).filter( AppUserMapping.id == mapping_id, AppUserMapping.app_id == app_id ).first() if not mapping: raise HTTPException(status_code=404, detail="映射未找到") # Check Uniqueness for mapped_key if mapping_in.mapped_key is not None and mapping_in.mapped_key != mapping.mapped_key: if mapping_in.mapped_key: key_exists = db.query(AppUserMapping).filter( AppUserMapping.app_id == app_id, AppUserMapping.mapped_key == mapping_in.mapped_key ).first() if key_exists: raise HTTPException(status_code=400, detail=f"该应用下账号 {mapping_in.mapped_key} 已被使用") # Check Uniqueness for mapped_email if mapping_in.mapped_email is not None and mapping_in.mapped_email != mapping.mapped_email: if mapping_in.mapped_email: email_exists = db.query(AppUserMapping).filter( AppUserMapping.app_id == app_id, AppUserMapping.mapped_email == mapping_in.mapped_email ).first() if email_exists: raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapping_in.mapped_email} 已被使用") if mapping_in.mapped_key is not None: mapping.mapped_key = mapping_in.mapped_key if mapping_in.mapped_email is not None: mapping.mapped_email = mapping_in.mapped_email db.add(mapping) db.commit() db.refresh(mapping) return MappingResponse( id=mapping.id, app_id=mapping.app_id, user_id=mapping.user_id, mapped_key=mapping.mapped_key, mapped_email=mapping.mapped_email, user_mobile=mapping.user.mobile if mapping.user else "Deleted User", is_active=mapping.is_active ) @router.delete("/{app_id}/mappings/{mapping_id}", summary="删除映射") def delete_mapping( *, db: Session = Depends(deps.get_db), app_id: int, mapping_id: int, req: MappingDelete, current_user: User = Depends(deps.get_current_active_user), ): """ 删除映射关系。需验证密码。 """ # Verify Password if not security.verify_password(req.password, current_user.password_hash): raise HTTPException(status_code=401, detail="密码错误") mapping = db.query(AppUserMapping).filter( AppUserMapping.id == mapping_id, AppUserMapping.app_id == app_id ).first() if not mapping: raise HTTPException(status_code=404, detail="映射未找到") app = db.query(Application).filter(Application.id == app_id).first() if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") db.delete(mapping) db.commit() return {"message": "删除成功"} @router.get("/{app_id}/mappings/export", summary="导出映射") def export_mappings( *, db: Session = Depends(deps.get_db), app_id: int, current_user: User = Depends(deps.get_current_active_user), ): """ 导出所有映射到 Excel (.xlsx)。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all() # Prepare data for DataFrame data = [] for m in mappings: mobile = m.user.mobile if m.user else "Deleted User" data.append({ '手机号': mobile, '映射账号': m.mapped_key, '映射邮箱': m.mapped_email or '' }) # Create DataFrame df = pd.DataFrame(data) # If no data, create an empty DataFrame with columns if not data: df = pd.DataFrame(columns=['手机号', '映射账号', '映射邮箱']) # Write to Excel BytesIO output = io.BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: df.to_excel(writer, index=False) output.seek(0) filename = f"mappings_app_{app_id}.xlsx" return StreamingResponse( output, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": f"attachment; filename={filename}"} ) from fastapi import UploadFile, File @router.post("/{app_id}/mapping/preview", response_model=MappingPreviewResponse, summary="预览映射导入") async def preview_mapping( app_id: int, file: UploadFile = File(...), db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): """ 预览 Excel/CSV 映射导入。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") contents = await file.read() filename = file.filename return MappingService.preview_import(db, app_id, contents, filename) @router.post("/{app_id}/mapping/import", response_model=MappingImportSummary, summary="执行映射导入") async def import_mapping( app_id: int, file: UploadFile = File(...), strategy: MappingStrategy = MappingStrategy.SKIP, db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): """ 执行映射导入操作。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") contents = await file.read() filename = file.filename return MappingService.execute_import(db, app_id, contents, filename, strategy) @router.post("/mapping/sync", response_model=MappingResponse, summary="同步映射 (M2M)") def sync_mapping( *, db: Session = Depends(deps.get_db), sync_in: UserSyncRequest, current_app: Application = Depends(deps.get_current_app), ): """ 从外部平台同步用户映射关系(机器对机器)。 只同步映射关系,不创建或更新用户本身。 需要应用访问令牌 (Authorization Bearer JWT 或 X-App-Access-Token)。 """ # Normalize input: treat empty strings as None mapped_key = sync_in.mapped_key if sync_in.mapped_key else None mapped_email = sync_in.mapped_email if sync_in.mapped_email else None # 1. Find User or Create user = db.query(User).filter(User.mobile == sync_in.mobile).first() if not user: # Auto create user password = security.generate_alphanumeric_password(8) # Random password letters+digits user = User( mobile=sync_in.mobile, password_hash=security.get_password_hash(password), status="ACTIVE", role="ORDINARY_USER" ) db.add(user) db.commit() db.refresh(user) # 2. Handle Mapping mapping = db.query(AppUserMapping).filter( AppUserMapping.app_id == current_app.id, AppUserMapping.user_id == user.id ).first() # Check Uniqueness for mapped_key (if changing or new, and provided) if mapped_key and (not mapping or mapping.mapped_key != mapped_key): key_exists = db.query(AppUserMapping).filter( AppUserMapping.app_id == current_app.id, AppUserMapping.mapped_key == mapped_key ).first() if key_exists: raise HTTPException(status_code=400, detail=f"该应用下账号 {mapped_key} 已被使用") # Check Uniqueness for mapped_email (if changing or new, and provided) if mapped_email and (not mapping or mapping.mapped_email != mapped_email): email_exists = db.query(AppUserMapping).filter( AppUserMapping.app_id == current_app.id, AppUserMapping.mapped_email == mapped_email ).first() if email_exists: raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapped_email} 已被使用") if mapping: # Update existing mapping if sync_in.mapped_key is not None: mapping.mapped_key = mapped_key if sync_in.is_active is not None: mapping.is_active = sync_in.is_active if sync_in.mapped_email is not None: mapping.mapped_email = mapped_email else: # Create new mapping mapping = AppUserMapping( app_id=current_app.id, user_id=user.id, mapped_key=mapped_key, mapped_email=mapped_email, is_active=sync_in.is_active if sync_in.is_active is not None else True ) db.add(mapping) db.commit() db.refresh(mapping) return MappingResponse( id=mapping.id, app_id=mapping.app_id, user_id=mapping.user_id, mapped_key=mapping.mapped_key, mapped_email=mapping.mapped_email, user_mobile=user.mobile, is_active=mapping.is_active )