| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637 |
- import secrets
- import string
- import io
- import csv
- import pandas as pd
- from typing import List
- from fastapi import APIRouter, Depends, HTTPException, Response
- from fastapi.responses import StreamingResponse
- from sqlalchemy.orm import Session
- from sqlalchemy import desc
- from app.api.v1 import deps
- from app.core import security
- from app.models.application import Application
- from app.models.user import User
- from app.models.mapping import AppUserMapping
- from app.schemas.application import (
- ApplicationCreate,
- ApplicationUpdate,
- ApplicationResponse,
- ApplicationList,
- ApplicationSecretDisplay,
- ViewSecretRequest
- )
- from app.schemas.mapping import (
- MappingList,
- MappingResponse,
- MappingCreate,
- MappingUpdate,
- MappingDelete,
- MappingPreviewResponse,
- MappingImportSummary,
- MappingStrategy
- )
- from app.schemas.user import UserSyncRequest
- from app.services.mapping_service import MappingService
- router = APIRouter()
- def generate_access_token():
- return secrets.token_urlsafe(32)
- def generate_app_credentials():
- # Generate a random 16-char App ID (hex or alphanumeric)
- app_id = "app_" + secrets.token_hex(8)
- # Generate a strong 32-char App Secret
- alphabet = string.ascii_letters + string.digits
- app_secret = ''.join(secrets.choice(alphabet) for i in range(32))
- return app_id, app_secret
- @router.get("/", response_model=ApplicationList, summary="获取应用列表")
- def read_apps(
- skip: int = 0,
- limit: int = 10,
- search: str = None,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取应用列表(分页)。
- 超级管理员可以查看所有,开发者只能查看自己的应用。
- """
- query = db.query(Application).filter(Application.is_deleted == False)
-
- if current_user.role != "SUPER_ADMIN":
- query = query.filter(Application.owner_id == current_user.id)
-
- if search:
- # Search by name or app_id
- from sqlalchemy import or_
- query = query.filter(
- or_(
- Application.app_name.ilike(f"%{search}%"),
- Application.app_id.ilike(f"%{search}%")
- )
- )
-
- total = query.count()
- apps = query.order_by(desc(Application.id)).offset(skip).limit(limit).all()
- return {"total": total, "items": apps}
- @router.post("/", response_model=ApplicationSecretDisplay, summary="创建应用")
- def create_app(
- *,
- db: Session = Depends(deps.get_db),
- app_in: ApplicationCreate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 创建新应用。只会返回一次明文密钥。
- """
- # 1. Generate ID and Secret
- app_id, app_secret = generate_app_credentials()
- # 2. Generate Access Token
- access_token = generate_access_token()
-
- # 3. Store Secret (Plain text needed for HMAC verification)
-
- db_app = Application(
- app_id=app_id,
- app_secret=app_secret,
- access_token=access_token,
- app_name=app_in.app_name,
- icon_url=app_in.icon_url,
- protocol_type=app_in.protocol_type,
- redirect_uris=app_in.redirect_uris,
- notification_url=app_in.notification_url,
- owner_id=current_user.id # Assign owner
- )
-
- db.add(db_app)
- db.commit()
- db.refresh(db_app)
-
- return ApplicationSecretDisplay(app_id=app_id, app_secret=app_secret, access_token=access_token)
- @router.put("/{app_id}", response_model=ApplicationResponse, summary="更新应用")
- def update_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- app_in: ApplicationUpdate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 更新应用信息。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- update_data = app_in.model_dump(exclude_unset=True)
- for field, value in update_data.items():
- setattr(app, field, value)
-
- db.add(app)
- db.commit()
- db.refresh(app)
- return app
- @router.delete("/{app_id}", response_model=ApplicationResponse, summary="删除应用")
- def delete_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 软删除应用。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- app.is_deleted = True
- db.add(app)
- db.commit()
- return app
- @router.post("/{app_id}/regenerate-secret", response_model=ApplicationSecretDisplay, summary="重新生成密钥")
- def regenerate_secret(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 重新生成应用密钥。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- _, new_secret = generate_app_credentials()
- app.app_secret = new_secret
-
- db.add(app)
- db.commit()
-
- return ApplicationSecretDisplay(app_id=app.app_id, app_secret=new_secret, access_token=app.access_token)
- @router.post("/{app_id}/view-secret", response_model=ApplicationSecretDisplay, summary="查看密钥")
- def view_secret(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- req: ViewSecretRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 查看应用密钥。需要验证用户密码。
- """
- # 1. Verify Password
- if not security.verify_password(req.password, current_user.password_hash):
- raise HTTPException(status_code=401, detail="密码错误")
-
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- return ApplicationSecretDisplay(app_id=app.app_id, app_secret=app.app_secret, access_token=app.access_token)
- # ==========================================
- # Mappings
- # ==========================================
- @router.get("/{app_id}/mappings", response_model=MappingList, summary="获取应用映射列表")
- def read_mappings(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- skip: int = 0,
- limit: int = 10,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取应用的账号映射列表。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- query = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id)
- total = query.count()
- mappings = query.order_by(desc(AppUserMapping.id)).offset(skip).limit(limit).all()
-
- # Enrich with user mobile (handled by ORM relation usually, but for Pydantic 'from_attributes')
- # We added `user_mobile` to MappingResponse, so we need to ensure it's populated.
- # The ORM `mapping.user` is lazy loaded, which is fine for sync code.
- result = []
- for m in mappings:
- result.append(MappingResponse(
- id=m.id,
- app_id=m.app_id,
- user_id=m.user_id,
- mapped_key=m.mapped_key,
- mapped_email=m.mapped_email,
- user_mobile=m.user.mobile if m.user else "Deleted User"
- ))
-
- return {"total": total, "items": result}
- @router.post("/{app_id}/mappings", response_model=MappingResponse, summary="创建映射")
- def create_mapping(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- mapping_in: MappingCreate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 手动创建映射。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Verify Password
- if not security.verify_password(mapping_in.password, current_user.password_hash):
- raise HTTPException(status_code=401, detail="密码错误")
- # Normalize input: treat empty strings as None to avoid unique constraint violations
- mapped_key = mapping_in.mapped_key if mapping_in.mapped_key else None
- mapped_email = mapping_in.mapped_email if mapping_in.mapped_email else None
- # 1. Find User or Create
- user = db.query(User).filter(User.mobile == mapping_in.mobile, User.is_deleted == 0).first()
- new_user_created = False
- generated_password = None
- if not user:
- # Auto create user
- password_plain = security.generate_alphanumeric_password(8) # Random password letters+digits
- user = User(
- mobile=mapping_in.mobile,
- password_hash=security.get_password_hash(password_plain),
- status="ACTIVE",
- role="ORDINARY_USER"
- )
- db.add(user)
- db.commit()
- db.refresh(user)
- new_user_created = True
- generated_password = password_plain
- # 2. Check if mapping exists
- existing = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.user_id == user.id
- ).first()
- if existing:
- raise HTTPException(status_code=400, detail="该用户的映射已存在")
- # 3. Check Uniqueness for mapped_email (if provided)
- if mapped_email:
- email_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_email == mapped_email
- ).first()
- if email_exists:
- raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapped_email} 已被使用")
- # 4. Check Uniqueness for mapped_key
- if mapped_key:
- key_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_key == mapped_key
- ).first()
- if key_exists:
- raise HTTPException(status_code=400, detail=f"该应用下账号 {mapped_key} 已被使用")
-
- # 5. Create
- mapping = AppUserMapping(
- app_id=app_id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=mapped_email
- )
- db.add(mapping)
- db.commit()
- db.refresh(mapping)
-
- return MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=user.mobile,
- new_user_created=new_user_created,
- generated_password=generated_password
- )
- @router.put("/{app_id}/mappings/{mapping_id}", response_model=MappingResponse, summary="更新映射")
- def update_mapping(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- mapping_id: int,
- mapping_in: MappingUpdate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 更新映射信息。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Verify Password
- if not security.verify_password(mapping_in.password, current_user.password_hash):
- raise HTTPException(status_code=401, detail="密码错误")
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.id == mapping_id,
- AppUserMapping.app_id == app_id
- ).first()
- if not mapping:
- raise HTTPException(status_code=404, detail="映射未找到")
- # Check Uniqueness for mapped_key
- if mapping_in.mapped_key is not None and mapping_in.mapped_key != mapping.mapped_key:
- if mapping_in.mapped_key:
- key_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_key == mapping_in.mapped_key
- ).first()
- if key_exists:
- raise HTTPException(status_code=400, detail=f"该应用下账号 {mapping_in.mapped_key} 已被使用")
- # Check Uniqueness for mapped_email
- if mapping_in.mapped_email is not None and mapping_in.mapped_email != mapping.mapped_email:
- if mapping_in.mapped_email:
- email_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_email == mapping_in.mapped_email
- ).first()
- if email_exists:
- raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapping_in.mapped_email} 已被使用")
- if mapping_in.mapped_key is not None:
- mapping.mapped_key = mapping_in.mapped_key
- if mapping_in.mapped_email is not None:
- mapping.mapped_email = mapping_in.mapped_email
- db.add(mapping)
- db.commit()
- db.refresh(mapping)
- return MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=mapping.user.mobile if mapping.user else "Deleted User",
- is_active=mapping.is_active
- )
- @router.delete("/{app_id}/mappings/{mapping_id}", summary="删除映射")
- def delete_mapping(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- mapping_id: int,
- req: MappingDelete,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 删除映射关系。需验证密码。
- """
- # Verify Password
- if not security.verify_password(req.password, current_user.password_hash):
- raise HTTPException(status_code=401, detail="密码错误")
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.id == mapping_id,
- AppUserMapping.app_id == app_id
- ).first()
- if not mapping:
- raise HTTPException(status_code=404, detail="映射未找到")
-
- app = db.query(Application).filter(Application.id == app_id).first()
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- db.delete(mapping)
- db.commit()
- return {"message": "删除成功"}
- @router.get("/{app_id}/mappings/export", summary="导出映射")
- def export_mappings(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 导出所有映射到 Excel (.xlsx)。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all()
-
- # Prepare data for DataFrame
- data = []
- for m in mappings:
- mobile = m.user.mobile if m.user else "Deleted User"
- data.append({
- '手机号': mobile,
- '映射账号': m.mapped_key,
- '映射邮箱': m.mapped_email or ''
- })
-
- # Create DataFrame
- df = pd.DataFrame(data)
-
- # If no data, create an empty DataFrame with columns
- if not data:
- df = pd.DataFrame(columns=['手机号', '映射账号', '映射邮箱'])
- # Write to Excel BytesIO
- output = io.BytesIO()
- with pd.ExcelWriter(output, engine='openpyxl') as writer:
- df.to_excel(writer, index=False)
-
- output.seek(0)
-
- filename = f"mappings_app_{app_id}.xlsx"
-
- return StreamingResponse(
- output,
- media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- headers={"Content-Disposition": f"attachment; filename={filename}"}
- )
- from fastapi import UploadFile, File
- @router.post("/{app_id}/mapping/preview", response_model=MappingPreviewResponse, summary="预览映射导入")
- async def preview_mapping(
- app_id: int,
- file: UploadFile = File(...),
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 预览 Excel/CSV 映射导入。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- contents = await file.read()
- filename = file.filename
- return MappingService.preview_import(db, app_id, contents, filename)
- @router.post("/{app_id}/mapping/import", response_model=MappingImportSummary, summary="执行映射导入")
- async def import_mapping(
- app_id: int,
- file: UploadFile = File(...),
- strategy: MappingStrategy = MappingStrategy.SKIP,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 执行映射导入操作。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- contents = await file.read()
- filename = file.filename
- return MappingService.execute_import(db, app_id, contents, filename, strategy)
- @router.post("/mapping/sync", response_model=MappingResponse, summary="同步映射 (M2M)")
- def sync_mapping(
- *,
- db: Session = Depends(deps.get_db),
- sync_in: UserSyncRequest,
- current_app: Application = Depends(deps.get_current_app),
- ):
- """
- 从外部平台同步用户映射关系(机器对机器)。
- 只同步映射关系,不创建或更新用户本身。
- 需要应用访问令牌 (Authorization Bearer JWT 或 X-App-Access-Token)。
- """
- # Normalize input: treat empty strings as None
- mapped_key = sync_in.mapped_key if sync_in.mapped_key else None
- mapped_email = sync_in.mapped_email if sync_in.mapped_email else None
- # 1. Find User or Create
- user = db.query(User).filter(User.mobile == sync_in.mobile).first()
- if not user:
- # Auto create user
- password = security.generate_alphanumeric_password(8) # Random password letters+digits
- user = User(
- mobile=sync_in.mobile,
- password_hash=security.get_password_hash(password),
- status="ACTIVE",
- role="ORDINARY_USER"
- )
- db.add(user)
- db.commit()
- db.refresh(user)
- # 2. Handle Mapping
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.user_id == user.id
- ).first()
- # Check Uniqueness for mapped_key (if changing or new, and provided)
- if mapped_key and (not mapping or mapping.mapped_key != mapped_key):
- key_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.mapped_key == mapped_key
- ).first()
- if key_exists:
- raise HTTPException(status_code=400, detail=f"该应用下账号 {mapped_key} 已被使用")
- # Check Uniqueness for mapped_email (if changing or new, and provided)
- if mapped_email and (not mapping or mapping.mapped_email != mapped_email):
- email_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.mapped_email == mapped_email
- ).first()
- if email_exists:
- raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapped_email} 已被使用")
- if mapping:
- # Update existing mapping
- if sync_in.mapped_key is not None:
- mapping.mapped_key = mapped_key
- if sync_in.is_active is not None:
- mapping.is_active = sync_in.is_active
- if sync_in.mapped_email is not None:
- mapping.mapped_email = mapped_email
- else:
- # Create new mapping
- mapping = AppUserMapping(
- app_id=current_app.id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=mapped_email,
- is_active=sync_in.is_active if sync_in.is_active is not None else True
- )
- db.add(mapping)
- db.commit()
- db.refresh(mapping)
- return MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=user.mobile,
- is_active=mapping.is_active
- )
|