| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758 |
- import secrets
- import string
- import io
- import csv
- import pandas as pd
- import logging
- import json
- from typing import List, Optional
- from datetime import datetime
- from fastapi import APIRouter, Depends, HTTPException, Response, UploadFile, File, Form, Query, Request
- from fastapi.responses import StreamingResponse
- from sqlalchemy.orm import Session
- from sqlalchemy import desc, or_, func
- from app.api.v1 import deps
- from app.core import security
- from app.models.application import Application, ProtocolType
- from app.models.user import User, UserStatus
- from app.models.mapping import AppUserMapping
- from app.models.app_category import AppCategory
- from app.core.utils import generate_english_name, get_client_ip
- from app.schemas.application import (
- ApplicationCreate,
- ApplicationUpdate,
- ApplicationResponse,
- ApplicationList,
- ApplicationSecretDisplay,
- ViewSecretRequest,
- RegenerateSecretRequest,
- ApplicationTransferRequest,
- AppSyncRequest,
- CategoryStats,
- AppCategoryCreate,
- AppCategoryUpdate,
- AppCategoryResponse,
- BatchUpdateCategoryRequest
- )
- from app.schemas.mapping import (
- MappingList,
- MappingResponse,
- MappingCreate,
- MappingUpdate,
- MappingDelete,
- MappingPreviewResponse,
- MappingImportSummary,
- MappingStrategy,
- ImportLogResponse
- )
- from app.schemas.user import UserSyncRequest, UserSyncList
- from app.services.mapping_service import MappingService
- from app.services.sms_service import SmsService
- from app.services.log_service import LogService
- from app.services.hydra_service import hydra_service
- from app.schemas.operation_log import ActionType, OperationLogList, OperationLogResponse
- router = APIRouter()
- logger = logging.getLogger(__name__)
- def generate_access_token():
- return secrets.token_urlsafe(32)
- def generate_app_credentials():
- # Generate a random 16-char App ID (hex or alphanumeric)
- app_id = "app_" + secrets.token_hex(8)
- # Generate a strong 32-char App Secret
- alphabet = string.ascii_letters + string.digits
- app_secret = ''.join(secrets.choice(alphabet) for i in range(32))
- return app_id, app_secret
- @router.get("/", response_model=ApplicationList, summary="获取应用列表")
- def read_apps(
- skip: int = 0,
- limit: int = 10,
- search: str = None,
- category_id: int = None,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取应用列表(分页)。
- 超级管理员可以查看所有,开发者只能查看自己的应用。
-
- category_id:
- - None: 获取所有应用
- - 0: 获取未分类的应用(category_id IS NULL)
- - >0: 获取指定分类的应用
- """
- query = db.query(Application).filter(Application.is_deleted == False)
-
- if current_user.role != "SUPER_ADMIN":
- query = query.filter(Application.owner_id == current_user.id)
-
- # 按分类筛选
- if category_id is not None:
- if category_id == 0:
- # 未分类的应用
- query = query.filter(Application.category_id.is_(None))
- else:
- # 指定分类的应用
- query = query.filter(Application.category_id == category_id)
-
- if search:
- # Search by name or app_id
- query = query.filter(
- or_(
- Application.app_name.ilike(f"%{search}%"),
- Application.app_id.ilike(f"%{search}%")
- )
- )
-
- total = query.count()
- apps = query.order_by(desc(Application.id)).offset(skip).limit(limit).all()
-
- # 为每个应用添加 category_name
- items = []
- for app in apps:
- app_dict = {
- **{c.name: getattr(app, c.name) for c in app.__table__.columns},
- "category_name": app.category.name if app.category else None
- }
- items.append(ApplicationResponse(**app_dict))
-
- return {"total": total, "items": items}
- @router.get("/categories", response_model=List[CategoryStats], summary="获取所有应用分类")
- def get_categories(
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- search: str = None,
- ):
- """
- 获取所有应用分类及其统计信息。
- 包括:预设分类 + 已使用的自定义分类
- 超级管理员查看所有,开发者只查看自己应用使用的分类。
- """
- try:
- logger.info(f"[分类管理] 请求开始 - 用户ID: {current_user.id}, 角色: {current_user.role}, 搜索参数: {repr(search)}")
-
- # 获取预设分类
- preset_query = db.query(AppCategory).order_by(AppCategory.name)
- if search and search.strip():
- preset_query = preset_query.filter(AppCategory.name.ilike(f"%{search.strip()}%"))
- preset_categories = preset_query.all()
- logger.info(f"[分类管理] 预设分类数量: {len(preset_categories)}")
-
- # 统计每个预设分类的应用数量
- category_list = []
- for cat in preset_categories:
- query = db.query(func.count(Application.id)).filter(
- Application.is_deleted == False,
- Application.category_id == cat.id
- )
- if current_user.role != "SUPER_ADMIN":
- query = query.filter(Application.owner_id == current_user.id)
- app_count = query.scalar() or 0
-
- category_list.append(CategoryStats(
- category_id=cat.id,
- category_name=cat.name,
- app_count=app_count
- ))
-
- logger.info(f"[分类管理] 最终返回数据数量: {len(category_list)}")
- return category_list
-
- except Exception as e:
- logger.error(f"[分类管理] 发生未捕获的异常: {e}", exc_info=True)
- raise
- @router.get("/categories/{category_id}/apps", response_model=ApplicationList, summary="获取分类下的应用列表")
- def get_category_apps(
- category_id: int,
- skip: int = 0,
- limit: int = 100,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """获取指定分类下的所有应用"""
- category = db.query(AppCategory).filter(AppCategory.id == category_id).first()
- if not category:
- raise HTTPException(status_code=404, detail="分类不存在")
-
- query = db.query(Application).filter(
- Application.is_deleted == False,
- Application.category_id == category_id
- )
-
- if current_user.role != "SUPER_ADMIN":
- query = query.filter(Application.owner_id == current_user.id)
-
- total = query.count()
- apps = query.order_by(desc(Application.id)).offset(skip).limit(limit).all()
-
- # 为每个应用添加 category_name
- items = []
- for app in apps:
- app_dict = {
- **{c.name: getattr(app, c.name) for c in app.__table__.columns},
- "category_name": app.category.name if app.category else None
- }
- items.append(ApplicationResponse(**app_dict))
-
- return {"total": total, "items": items}
- @router.post("/categories/batch-update", summary="批量更新应用分类")
- def batch_update_app_category(
- req: BatchUpdateCategoryRequest,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """批量更新应用的分类"""
- if not req.password:
- raise HTTPException(status_code=400, detail="需要提供管理员密码")
-
- if not security.verify_password(req.password, current_user.password_hash):
- raise HTTPException(status_code=400, detail="密码错误")
-
- if not req.app_ids:
- raise HTTPException(status_code=400, detail="请选择要更新的应用")
-
- # 验证分类是否存在(如果提供了 category_id)
- if req.category_id:
- category = db.query(AppCategory).filter(AppCategory.id == req.category_id).first()
- if not category:
- raise HTTPException(status_code=404, detail="分类不存在")
-
- # 查询要更新的应用
- query = db.query(Application).filter(
- Application.id.in_(req.app_ids),
- Application.is_deleted == False
- )
-
- if current_user.role != "SUPER_ADMIN":
- query = query.filter(Application.owner_id == current_user.id)
-
- apps = query.all()
-
- if len(apps) != len(req.app_ids):
- raise HTTPException(status_code=403, detail="部分应用不存在或无权限")
-
- # 批量更新分类
- updated_count = 0
- for app in apps:
- app.category_id = req.category_id
- db.add(app)
- updated_count += 1
-
- db.commit()
-
- action = f"设置为分类ID {req.category_id}" if req.category_id else "移除分类"
- logger.info(f"批量更新应用分类成功: {updated_count}个应用 {action} (Operator: {current_user.mobile})")
- return {"message": f"成功更新 {updated_count} 个应用的分类", "count": updated_count}
- @router.delete("/categories/{category_id}", summary="删除分类")
- def delete_category(
- category_id: int,
- verification_code: str = Query(..., description="手机验证码"),
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 删除分类。
- 注意:必须先移出该分类下的所有应用才能删除分类。
- 需要手机验证码验证。
- """
- if not SmsService.verify_code(current_user.mobile, verification_code):
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
-
- category = db.query(AppCategory).filter(AppCategory.id == category_id).first()
- if not category:
- raise HTTPException(status_code=404, detail="分类不存在")
-
- # 检查是否有应用使用此分类
- apps_count = db.query(Application).filter(
- Application.is_deleted == False,
- Application.category_id == category_id
- ).count()
-
- if apps_count > 0:
- raise HTTPException(
- status_code=400,
- detail=f"无法删除:有 {apps_count} 个应用正在使用此分类。请先移出这些应用后再删除分类。"
- )
-
- db.delete(category)
- db.commit()
-
- logger.info(f"分类删除成功: {category.name} (Operator: {current_user.mobile})")
- return {"message": f"成功删除分类 '{category.name}'"}
- # ==========================================
- # Preset Category Management
- # ==========================================
- @router.get("/preset-categories", response_model=List[AppCategoryResponse], summary="获取预设分类列表")
- def get_preset_categories(
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """获取所有预设分类"""
- categories = db.query(AppCategory).order_by(AppCategory.name).all()
- return categories
- @router.post("/preset-categories", response_model=AppCategoryResponse, summary="创建预设分类")
- def create_preset_category(
- category_in: AppCategoryCreate,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """创建预设分类(仅超级管理员)"""
- if current_user.role != "SUPER_ADMIN":
- raise HTTPException(status_code=403, detail="权限不足")
-
- # 检查分类名是否已存在
- existing = db.query(AppCategory).filter(AppCategory.name == category_in.name).first()
- if existing:
- raise HTTPException(status_code=400, detail=f"分类 '{category_in.name}' 已存在")
-
- category = AppCategory(**category_in.model_dump())
- db.add(category)
- db.commit()
- db.refresh(category)
-
- logger.info(f"预设分类创建成功: {category_in.name} (Operator: {current_user.mobile})")
- return category
- @router.put("/preset-categories/{category_id}", response_model=AppCategoryResponse, summary="更新预设分类")
- def update_preset_category(
- category_id: int,
- category_in: AppCategoryUpdate,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """更新预设分类(仅超级管理员,需要密码验证)"""
- if current_user.role != "SUPER_ADMIN":
- raise HTTPException(status_code=403, detail="权限不足")
-
- # 密码验证
- if not category_in.password:
- raise HTTPException(status_code=400, detail="需要提供密码")
-
- from app.core import security
- if not security.verify_password(category_in.password, current_user.password_hash):
- logger.warning(f"预设分类更新失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
-
- category = db.query(AppCategory).filter(AppCategory.id == category_id).first()
- if not category:
- raise HTTPException(status_code=404, detail="预设分类未找到")
-
- # 如果更新名称,检查是否重复
- if category_in.name and category_in.name != category.name:
- existing = db.query(AppCategory).filter(AppCategory.name == category_in.name).first()
- if existing:
- raise HTTPException(status_code=400, detail=f"分类 '{category_in.name}' 已存在")
-
- update_data = category_in.model_dump(exclude_unset=True)
- # 移除密码字段,不保存到数据库
- update_data.pop('password', None)
-
- for field, value in update_data.items():
- setattr(category, field, value)
-
- db.commit()
- db.refresh(category)
-
- logger.info(f"预设分类更新成功: {category.name} (Operator: {current_user.mobile})")
- return category
- @router.delete("/preset-categories/{category_id}", summary="删除预设分类")
- def delete_preset_category(
- category_id: int,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """删除预设分类(仅超级管理员)"""
- if current_user.role != "SUPER_ADMIN":
- raise HTTPException(status_code=403, detail="权限不足")
-
- category = db.query(AppCategory).filter(AppCategory.id == category_id).first()
- if not category:
- raise HTTPException(status_code=404, detail="预设分类未找到")
-
- # 检查是否有应用使用此分类
- apps_count = db.query(Application).filter(
- Application.category_id == category.id,
- Application.is_deleted == False
- ).count()
-
- if apps_count > 0:
- raise HTTPException(
- status_code=400,
- detail=f"无法删除:有 {apps_count} 个应用正在使用此分类"
- )
-
- db.delete(category)
- db.commit()
-
- logger.info(f"预设分类删除成功: {category.name} (Operator: {current_user.mobile})")
- return {"message": "删除成功"}
- @router.get("/{app_id}", response_model=ApplicationResponse, summary="获取单个应用详情")
- def read_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取单个应用详情。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- # 设置 category_name
- app_dict = {
- **{c.name: getattr(app, c.name) for c in app.__table__.columns},
- "category_name": app.category.name if app.category else None
- }
- return ApplicationResponse(**app_dict)
- @router.post("/", response_model=ApplicationSecretDisplay, summary="创建应用")
- def create_app(
- *,
- db: Session = Depends(deps.get_db),
- app_in: ApplicationCreate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 创建新应用。只会返回一次明文密钥。
- """
- # 1. Generate ID and Secret
- app_id, app_secret = generate_app_credentials()
- # 2. Generate Access Token
- access_token = generate_access_token()
-
- # 3. Store Secret (Plain text needed for HMAC verification)
-
- db_app = Application(
- app_id=app_id,
- app_secret=app_secret,
- access_token=access_token,
- app_name=app_in.app_name,
- icon_url=app_in.icon_url,
- protocol_type=app_in.protocol_type,
- redirect_uris=app_in.redirect_uris,
- notification_url=app_in.notification_url,
- description=app_in.description,
- category_id=app_in.category_id,
- owner_id=current_user.id # Assign owner
- )
-
- # 验证 category_id 是否存在
- if app_in.category_id:
- category = db.query(AppCategory).filter(AppCategory.id == app_in.category_id).first()
- if not category:
- raise HTTPException(status_code=400, detail=f"分类ID {app_in.category_id} 不存在")
-
- db.add(db_app)
- db.commit()
- db.refresh(db_app)
- logger.info(f"应用创建成功: {app_in.app_name} (ID: {app_id}, Owner: {current_user.mobile})")
- # 如果是 OIDC 应用,自动在 Hydra 中创建 / 更新对应的 OAuth2 Client
- if db_app.protocol_type == ProtocolType.OIDC:
- try:
- raw = db_app.redirect_uris or ""
- redirect_uris: list[str] = []
- if raw:
- # 1. 优先按 JSON 解析(支持 ["url1","url2"] 或 "url1")
- try:
- parsed = json.loads(raw)
- if isinstance(parsed, list):
- redirect_uris = [str(u).strip() for u in parsed if str(u).strip()]
- elif isinstance(parsed, str):
- if parsed.strip():
- redirect_uris = [parsed.strip()]
- except Exception:
- # 2. 非 JSON 时,支持逗号分隔或单个 URL
- parts = [u.strip() for u in raw.split(",") if u.strip()]
- if parts:
- redirect_uris = parts
- hydra_service.create_or_update_client(
- client_id=db_app.app_id,
- client_secret=db_app.app_secret,
- redirect_uris=redirect_uris,
- client_name=db_app.app_name or db_app.app_id,
- )
- logger.info(f"Hydra OIDC Client 已创建/更新: {db_app.app_id}, redirect_uris={redirect_uris}")
- except Exception as e:
- logger.exception(
- "应用创建成功,但在 Hydra 创建 OIDC Client 失败 (app_id=%s): %s",
- db_app.app_id,
- e,
- )
- # 如需强一致,可以在这里 raise HTTPException 中断创建流程
- return ApplicationSecretDisplay(app_id=app_id, app_secret=app_secret, access_token=access_token)
- @router.put("/{app_id}", response_model=ApplicationResponse, summary="更新应用")
- def update_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- app_in: ApplicationUpdate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 更新应用信息。需要手机验证码和密码验证。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Security Verification
- if not app_in.verification_code:
- raise HTTPException(status_code=400, detail="需要提供手机验证码")
- if not SmsService.verify_code(current_user.mobile, app_in.verification_code):
- logger.warning(f"应用更新失败: 验证码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
-
- update_data = app_in.model_dump(exclude_unset=True)
- # Remove security fields from update data
- update_data.pop('password', None)
- update_data.pop('verification_code', None)
-
- # 验证 category_id 是否存在
- if 'category_id' in update_data and update_data['category_id']:
- category = db.query(AppCategory).filter(AppCategory.id == update_data['category_id']).first()
- if not category:
- raise HTTPException(status_code=400, detail=f"分类ID {update_data['category_id']} 不存在")
- for field, value in update_data.items():
- setattr(app, field, value)
-
- db.add(app)
- db.commit()
- db.refresh(app)
- # 如果是 OIDC 应用,编辑后同步 Hydra 中的 OAuth2 Client
- if app.protocol_type == ProtocolType.OIDC:
- try:
- raw = app.redirect_uris or ""
- redirect_uris: list[str] = []
- if raw:
- # 1. 优先按 JSON 解析(支持 ["url1","url2"] 或 "url1")
- try:
- parsed = json.loads(raw)
- if isinstance(parsed, list):
- redirect_uris = [str(u).strip() for u in parsed if str(u).strip()]
- elif isinstance(parsed, str):
- if parsed.strip():
- redirect_uris = [parsed.strip()]
- except Exception:
- # 2. 非 JSON 时,支持逗号分隔或单个 URL
- parts = [u.strip() for u in raw.split(",") if u.strip()]
- if parts:
- redirect_uris = parts
- hydra_service.create_or_update_client(
- client_id=app.app_id,
- client_secret=app.app_secret,
- redirect_uris=redirect_uris,
- client_name=app.app_name or app.app_id,
- )
- logger.info(f"Hydra OIDC Client 已在编辑后同步: {app.app_id}, redirect_uris={redirect_uris}")
- except Exception as e:
- logger.exception(
- "应用编辑成功,但在 Hydra 同步 OIDC Client 失败 (app_id=%s): %s",
- app.app_id,
- e,
- )
- # 5. Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.UPDATE,
- details=update_data
- )
-
- logger.info(f"应用更新成功: {app.app_name} (ID: {app.app_id})")
- return app
- @router.delete("/{app_id}", response_model=ApplicationResponse, summary="删除应用")
- def delete_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 软删除应用。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- app.is_deleted = True
- db.add(app)
- db.commit()
-
- logger.info(f"应用删除成功: {app.app_name} (ID: {app.app_id}, Operator: {current_user.mobile})")
- return app
- @router.post("/{app_id}/regenerate-secret", response_model=ApplicationSecretDisplay, summary="重新生成密钥")
- def regenerate_secret(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- req: RegenerateSecretRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 重新生成应用密钥。需要手机验证码和密码验证。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- # Security Verification
- if not security.verify_password(req.password, current_user.password_hash):
- logger.warning(f"重置密钥失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
- if not SmsService.verify_code(current_user.mobile, req.verification_code):
- logger.warning(f"重置密钥失败: 验证码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
- _, new_secret = generate_app_credentials()
- app.app_secret = new_secret
-
- db.add(app)
- db.commit()
-
- # Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.REGENERATE_SECRET,
- details={"message": "Regenerated App Secret"}
- )
- logger.info(f"应用密钥已重置: {app.app_name} (ID: {app.app_id})")
- return ApplicationSecretDisplay(app_id=app.app_id, app_secret=new_secret, access_token=app.access_token)
- @router.post("/{app_id}/view-secret", response_model=ApplicationSecretDisplay, summary="查看密钥")
- def view_secret(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- req: ViewSecretRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 查看应用密钥。需要验证用户密码。
- """
- # 1. Verify Password
- if not security.verify_password(req.password, current_user.password_hash):
- logger.warning(f"查看密钥失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
-
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- # Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.VIEW_SECRET,
- details={"message": "Viewed App Secret"}
- )
-
- logger.info(f"查看应用密钥: {app.app_name} (Operator: {current_user.mobile})")
- return ApplicationSecretDisplay(app_id=app.app_id, app_secret=app.app_secret, access_token=app.access_token)
- @router.post("/{app_id}/transfer", response_model=ApplicationResponse, summary="转让应用")
- def transfer_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- req: ApplicationTransferRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 将应用转让给其他开发者或超级管理员。
- 需要验证:目标用户手机号、当前用户密码、短信验证码。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # 1. Verify Password
- if not security.verify_password(req.password, current_user.password_hash):
- logger.warning(f"转让应用失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
- # 2. Verify SMS Code
- if not SmsService.verify_code(current_user.mobile, req.verification_code):
- logger.warning(f"转让应用失败: 验证码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
- # 3. Verify Target User
- target_user = db.query(User).filter(User.mobile == req.target_mobile, User.is_deleted == 0).first()
- if not target_user:
- raise HTTPException(status_code=404, detail="目标用户不存在")
-
- if target_user.status != "ACTIVE":
- raise HTTPException(status_code=400, detail="目标用户状态不正常")
-
- if target_user.role not in ["DEVELOPER", "SUPER_ADMIN"]:
- raise HTTPException(status_code=400, detail="目标用户必须是开发者或超级管理员")
-
- if target_user.id == app.owner_id:
- raise HTTPException(status_code=400, detail="应用已归属于该用户")
- # 4. Transfer
- old_owner_id = app.owner_id
- app.owner_id = target_user.id
-
- db.add(app)
- db.commit()
- db.refresh(app)
-
- # 5. Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.TRANSFER,
- target_user_id=target_user.id,
- target_mobile=target_user.mobile,
- details={
- "old_owner_id": old_owner_id,
- "new_owner_id": target_user.id
- }
- )
-
- logger.info(f"应用转让成功: {app.app_name} 从 {current_user.mobile} 转让给 {target_user.mobile}")
-
- return app
- # ==========================================
- # Mappings
- # ==========================================
- @router.get("/{app_id}/mappings", response_model=MappingList, summary="获取应用映射列表")
- def read_mappings(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- skip: int = 0,
- limit: int = 10,
- search: Optional[str] = Query(None, description="按手机号、映射账号、映射邮箱模糊匹配"),
- mapping_is_active: Optional[bool] = Query(None, description="映射是否启用,不传不筛选"),
- user_status: Optional[str] = Query(None, description="统一认证账号状态:ACTIVE|PENDING|DISABLED|DELETED"),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取应用的账号映射列表。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- query = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id)
- if mapping_is_active is not None:
- query = query.filter(AppUserMapping.is_active == mapping_is_active)
- need_user_join = bool(search and search.strip()) or bool(user_status and user_status.strip())
- if need_user_join:
- query = query.outerjoin(User, AppUserMapping.user_id == User.id)
- if search and search.strip():
- term = f"%{search.strip()}%"
- query = query.filter(
- or_(
- AppUserMapping.mapped_key.ilike(term),
- AppUserMapping.mapped_email.ilike(term),
- User.mobile.ilike(term),
- )
- )
- if user_status and user_status.strip():
- st = user_status.strip().upper()
- if st == "DELETED":
- query = query.filter(User.id.is_(None))
- elif st in ("ACTIVE", "PENDING", "DISABLED"):
- query = query.filter(User.status == UserStatus[st])
- else:
- raise HTTPException(status_code=400, detail="无效的 user_status")
- total = query.count()
- mappings = query.order_by(desc(AppUserMapping.id)).offset(skip).limit(limit).all()
-
- # Enrich with user mobile (handled by ORM relation usually, but for Pydantic 'from_attributes')
- # We added `user_mobile` to MappingResponse, so we need to ensure it's populated.
- # The ORM `mapping.user` is lazy loaded, which is fine for sync code.
- result = []
- for m in mappings:
- result.append(MappingResponse(
- id=m.id,
- app_id=m.app_id,
- user_id=m.user_id,
- mapped_key=m.mapped_key,
- mapped_email=m.mapped_email,
- user_mobile=m.user.mobile if m.user else "Deleted User",
- user_status=m.user.status if m.user else "DELETED",
- is_active=m.is_active
- ))
-
- return {"total": total, "items": result}
- @router.post("/{app_id}/mappings", response_model=MappingResponse, summary="创建映射")
- def create_mapping(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- mapping_in: MappingCreate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 手动创建映射。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Verify Password
- if not security.verify_password(mapping_in.password, current_user.password_hash):
- logger.warning(f"创建映射失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
- # Normalize input: treat empty strings as None to avoid unique constraint violations
- mapped_key = mapping_in.mapped_key if mapping_in.mapped_key else None
- mapped_email = mapping_in.mapped_email if mapping_in.mapped_email else None
- # 1. Find User or Create
- user = db.query(User).filter(User.mobile == mapping_in.mobile, User.is_deleted == 0).first()
- new_user_created = False
- generated_password = None
- if not user:
- # Auto create user
- password_plain = security.generate_alphanumeric_password(8) # Random password letters+digits
- random_suffix = security.generate_alphanumeric_password(6)
- user = User(
- mobile=mapping_in.mobile,
- password_hash=security.get_password_hash(password_plain),
- status="ACTIVE",
- role="ORDINARY_USER",
- name=f"用户{random_suffix}",
- english_name=mapped_key
- )
- db.add(user)
- db.commit()
- db.refresh(user)
- new_user_created = True
- generated_password = password_plain
- logger.info(f"自动创建用户: {user.mobile}")
- # 2. Check if mapping exists
- existing = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.user_id == user.id
- ).first()
- if existing:
- raise HTTPException(status_code=400, detail="该用户的映射已存在")
- # 3. Check Uniqueness for mapped_email (if provided)
- if mapped_email:
- email_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_email == mapped_email
- ).first()
- if email_exists:
- raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapped_email} 已被使用")
- # 4. Check Uniqueness for mapped_key
- if mapped_key:
- key_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_key == mapped_key
- ).first()
- if key_exists:
- raise HTTPException(status_code=400, detail=f"该应用下账号 {mapped_key} 已被使用")
-
- # 5. Create
- mapping = AppUserMapping(
- app_id=app_id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=mapped_email
- )
- db.add(mapping)
- db.commit()
- db.refresh(mapping)
-
- # LOGGING
- LogService.create_log(
- db=db,
- app_id=app_id,
- operator_id=current_user.id,
- action_type=ActionType.MANUAL_ADD,
- target_user_id=user.id,
- target_mobile=user.mobile,
- details={
- "mapped_key": mapped_key,
- "mapped_email": mapped_email,
- "new_user_created": new_user_created
- }
- )
-
- logger.info(f"映射创建成功: App {app_id} -> User {user.mobile} ({mapped_key})")
- return MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=user.mobile,
- user_status=user.status,
- is_active=mapping.is_active,
- new_user_created=new_user_created,
- generated_password=generated_password
- )
- @router.put("/{app_id}/mappings/{mapping_id}", response_model=MappingResponse, summary="更新映射")
- def update_mapping(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- mapping_id: int,
- mapping_in: MappingUpdate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 更新映射信息。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Verify Password
- if not security.verify_password(mapping_in.password, current_user.password_hash):
- logger.warning(f"更新映射失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.id == mapping_id,
- AppUserMapping.app_id == app_id
- ).first()
- if not mapping:
- raise HTTPException(status_code=404, detail="映射未找到")
- # Check Uniqueness for mapped_key
- if mapping_in.mapped_key is not None and mapping_in.mapped_key != mapping.mapped_key:
- if mapping_in.mapped_key:
- key_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_key == mapping_in.mapped_key
- ).first()
- if key_exists:
- raise HTTPException(status_code=400, detail=f"该应用下账号 {mapping_in.mapped_key} 已被使用")
- # Check Uniqueness for mapped_email
- if mapping_in.mapped_email is not None and mapping_in.mapped_email != mapping.mapped_email:
- if mapping_in.mapped_email:
- email_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_email == mapping_in.mapped_email
- ).first()
- if email_exists:
- raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapping_in.mapped_email} 已被使用")
- # Capture old values for logging
- old_key = mapping.mapped_key
- old_email = mapping.mapped_email
- old_is_active = mapping.is_active
- if mapping_in.mapped_key is not None:
- mapping.mapped_key = mapping_in.mapped_key
- if mapping_in.mapped_email is not None:
- mapping.mapped_email = mapping_in.mapped_email
- if mapping_in.is_active is not None:
- mapping.is_active = mapping_in.is_active
- db.add(mapping)
- db.commit()
- db.refresh(mapping)
- # LOGGING
- LogService.create_log(
- db=db,
- app_id=app_id,
- operator_id=current_user.id,
- action_type=ActionType.UPDATE,
- target_user_id=mapping.user_id,
- target_mobile=mapping.user.mobile if mapping.user else None,
- details={
- "old": {
- "mapped_key": old_key,
- "mapped_email": old_email,
- "is_active": old_is_active,
- },
- "new": {
- "mapped_key": mapping.mapped_key,
- "mapped_email": mapping.mapped_email,
- "is_active": mapping.is_active,
- },
- }
- )
-
- logger.info(f"映射更新成功: App {app_id} User {mapping.user.mobile if mapping.user else 'unknown'}")
- return MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=mapping.user.mobile if mapping.user else "Deleted User",
- user_status=mapping.user.status if mapping.user else "DELETED",
- is_active=mapping.is_active
- )
- @router.delete("/{app_id}/mappings/{mapping_id}", summary="删除映射")
- def delete_mapping(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- mapping_id: int,
- req: MappingDelete,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 删除映射关系。需验证密码。
- """
- # Verify Password
- if not security.verify_password(req.password, current_user.password_hash):
- logger.warning(f"删除映射失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.id == mapping_id,
- AppUserMapping.app_id == app_id
- ).first()
- if not mapping:
- raise HTTPException(status_code=404, detail="映射未找到")
-
- app = db.query(Application).filter(Application.id == app_id).first()
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Capture for logging
- target_user_id = mapping.user_id
- target_mobile = mapping.user.mobile if mapping.user else None
-
- db.delete(mapping)
- db.commit()
-
- # LOGGING
- LogService.create_log(
- db=db,
- app_id=app_id,
- operator_id=current_user.id,
- action_type=ActionType.DELETE,
- target_user_id=target_user_id,
- target_mobile=target_mobile,
- details={"mapping_id": mapping_id}
- )
-
- logger.info(f"映射删除成功: App {app_id}, Mapping {mapping_id}")
-
- return {"message": "删除成功"}
- @router.post("/{app_id}/sync-users", summary="同步所有用户")
- def sync_users_to_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 一键导入用户管理中的用户数据到应用映射中。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Get all active users
- users = db.query(User).filter(User.is_deleted == 0).all()
-
- # Get existing mappings (user_ids)
- existing_mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all()
- mapped_user_ids = {m.user_id for m in existing_mappings}
-
- new_mappings = []
- logger.info(f"开始同步用户到应用 {app.app_name} (ID: {app_id})")
-
- for user in users:
- if user.id in mapped_user_ids:
- continue
-
- # Create mapping
- mapped_key = user.english_name if user.english_name else user.mobile
-
- mapping = AppUserMapping(
- app_id=app.id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=None,
- is_active=True
- )
- new_mappings.append(mapping)
- if new_mappings:
- try:
- db.bulk_save_objects(new_mappings)
- db.commit()
- logger.info(f"用户同步完成: 新增 {len(new_mappings)} 条映射")
- except Exception as e:
- db.rollback()
- logger.error(f"批量同步用户失败: {e}。尝试逐条插入。")
-
- # Fallback: try one by one
- success_count = 0
- for m in new_mappings:
- try:
- db.add(m)
- db.commit()
- success_count += 1
- except Exception as ex:
- db.rollback()
- logger.warning(f"单个用户映射失败 (User: {m.user_id}): {ex}")
-
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.IMPORT,
- details={"message": "Sync all users (partial)", "attempted": len(new_mappings), "success": success_count}
- )
- return {"message": f"同步完成,成功 {success_count} 个,失败 {len(new_mappings) - success_count} 个 (可能是账号冲突)"}
-
- # Log success
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.IMPORT,
- details={"message": "Sync all users", "count": len(new_mappings)}
- )
- return {"message": f"同步成功,新增 {len(new_mappings)} 个用户映射"}
- logger.info("用户同步: 没有需要同步的新用户")
- return {"message": "没有需要同步的用户"}
- @router.post("/{app_id}/sync-users-v2", summary="同步用户 (新版)")
- def sync_users_to_app_v2(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- sync_req: AppSyncRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 高级用户同步功能。
- 支持全量/部分同步,以及可选的默认邮箱初始化。
- 需要手机验证码。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # 1. Verify SMS Code
- if not SmsService.verify_code(current_user.mobile, sync_req.verification_code):
- logger.warning(f"同步用户失败: 验证码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
- # 2. Determine Target Users
- query = db.query(User).filter(User.is_deleted == 0)
-
- if sync_req.mode == "SELECTED":
- if not sync_req.user_ids:
- raise HTTPException(status_code=400, detail="请选择要同步的用户")
- query = query.filter(User.id.in_(sync_req.user_ids))
-
- users = query.all()
-
- if not users:
- return {"message": "没有找到可同步的用户"}
- # 3. Get existing mappings (user_ids) to skip
- existing_mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all()
- mapped_user_ids = {m.user_id for m in existing_mappings}
-
- # Check if email domain is valid format if provided (simple check)
- if sync_req.init_email and not sync_req.email_domain:
- raise HTTPException(status_code=400, detail="开启邮箱初始化时必须填写域名")
- new_mappings = []
-
- for user in users:
- if user.id in mapped_user_ids:
- continue
-
- mapped_key = user.english_name if user.english_name else user.mobile
-
- mapped_email = None
- if sync_req.init_email and user.english_name:
- # Construct email
- domain = sync_req.email_domain.strip()
- if not domain.startswith("@"):
- domain = "@" + domain
- mapped_email = f"{user.english_name}{domain}"
-
- mapping = AppUserMapping(
- app_id=app.id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=mapped_email,
- is_active=True
- )
- new_mappings.append(mapping)
- if not new_mappings:
- return {"message": "所有选中的用户均已存在映射,无需同步"}
- # 4. Insert
- logger.info(f"开始同步(v2)用户到应用 {app.app_name},计划新增 {len(new_mappings)} 条")
-
- success_count = 0
- fail_count = 0
-
- for m in new_mappings:
- try:
- # Additional check: uniqueness of mapped_key in this app
- db.add(m)
- db.commit()
- success_count += 1
- except Exception as e:
- db.rollback()
- fail_count += 1
- logger.warning(f"同步单个映射失败 (User: {m.user_id}): {e}")
-
- # 5. Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.SYNC,
- details={
- "mode": sync_req.mode,
- "init_email": sync_req.init_email,
- "total_attempted": len(new_mappings),
- "success": success_count,
- "failed": fail_count
- }
- )
-
- logger.info(f"同步(v2)完成。成功: {success_count}, 失败: {fail_count}")
- msg = f"同步完成。成功: {success_count},失败: {fail_count}"
- if fail_count > 0:
- msg += " (失败原因可能是账号或邮箱冲突)"
-
- return {"message": msg}
- @router.get("/{app_id}/mappings/export", summary="导出映射")
- def export_mappings(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 导出所有映射到 Excel (.xlsx)。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all()
-
- # Prepare data for DataFrame
- data = []
- for m in mappings:
- mobile = m.user.mobile if m.user else "Deleted User"
- data.append({
- '手机号': mobile,
- '映射账号': m.mapped_key,
- '映射邮箱': m.mapped_email or ''
- })
-
- # Create DataFrame
- df = pd.DataFrame(data)
-
- # If no data, create an empty DataFrame with columns
- if not data:
- df = pd.DataFrame(columns=['手机号', '映射账号', '映射邮箱'])
- # Write to Excel BytesIO
- output = io.BytesIO()
- with pd.ExcelWriter(output, engine='openpyxl') as writer:
- df.to_excel(writer, index=False)
-
- output.seek(0)
-
- filename = f"mappings_app_{app_id}.xlsx"
- logger.info(f"导出映射成功: App {app_id}, Count {len(mappings)}")
-
- return StreamingResponse(
- output,
- media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- headers={"Content-Disposition": f"attachment; filename={filename}"}
- )
- @router.post("/{app_id}/mapping/preview", response_model=MappingPreviewResponse, summary="预览映射导入")
- async def preview_mapping(
- app_id: int,
- file: UploadFile = File(...),
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 预览 Excel/CSV 映射导入。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- contents = await file.read()
- filename = file.filename
- try:
- return MappingService.preview_import(db, app_id, contents, filename)
- except Exception as e:
- logger.error(f"导入预览失败: {e}", exc_info=True)
- raise HTTPException(status_code=400, detail=f"解析文件失败: {str(e)}")
- @router.post("/send-import-verification-code", summary="发送导入验证码")
- def send_import_verification_code(
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 发送验证码给当前登录用户(用于敏感操作验证,如导入)。
- """
- SmsService.send_code(current_user.mobile)
- logger.info(f"发送导入验证码: {current_user.mobile}")
- return {"message": "验证码已发送"}
- @router.post("/{app_id}/mapping/import", response_model=ImportLogResponse, summary="执行映射导入")
- async def import_mapping(
- app_id: int,
- file: UploadFile = File(...),
- strategy: MappingStrategy = Form(MappingStrategy.SKIP),
- verification_code: str = Form(...),
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 执行映射导入操作。需要验证短信验证码。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- contents = await file.read()
- filename = file.filename
-
- logger.info(f"开始执行映射导入: App {app_id}, File {filename}, Strategy {strategy}")
- try:
- result = MappingService.execute_import(db, app_id, contents, filename, strategy, current_user.mobile, verification_code)
- except Exception as e:
- logger.error(f"执行映射导入异常: {e}", exc_info=True)
- raise e
-
- # LOGGING
- # For import, we log the summary and the logs structure
- LogService.create_log(
- db=db,
- app_id=app_id,
- operator_id=current_user.id,
- action_type=ActionType.IMPORT,
- details=result.model_dump(mode='json') # Store full result including logs
- )
-
- logger.info(f"映射导入完成: 成功 {result.summary.inserted + result.summary.updated}, 失败 {result.summary.failed}")
-
- return result
- @router.get("/mapping/users", response_model=UserSyncList, summary="获取全量用户(M2M)")
- def get_all_users_m2m(
- *,
- db: Session = Depends(deps.get_db),
- skip: int = 0,
- limit: int = 100,
- current_app: Application = Depends(deps.get_current_app),
- ):
- """
- 开发者拉取全量用户接口。
- 仅返回:手机号、姓名、英文名。
- 需要应用访问令牌 (Authorization Bearer JWT 或 X-App-Access-Token)。
- """
- query = db.query(User).filter(User.is_deleted == 0)
-
- total = query.count()
- users = query.order_by(User.id).offset(skip).limit(limit).all()
-
- return {"total": total, "items": users}
- @router.post("/mapping/sync", response_model=MappingResponse, summary="同步映射 (M2M)")
- def sync_mapping(
- request: Request,
- *,
- db: Session = Depends(deps.get_db),
- sync_in: UserSyncRequest,
- current_app: Application = Depends(deps.get_current_app),
- ):
- """
- 从外部平台同步用户映射关系(机器对机器)。
- 支持增删改查:
- - UPSERT (默认): 创建或更新映射及用户;必须提供姓名;新建用户时英文名由姓名自动生成并去重。
- - DELETE: 仅删除应用与用户的映射关系,不删除用户。
- 需要应用访问令牌 (Authorization Bearer JWT 或 X-App-Access-Token)。
- """
- # Normalize input: treat empty strings as None (mobile / mapped_key 已在 Schema 中强制校验)
- mapped_key = sync_in.mapped_key
- mapped_email = sync_in.mapped_email if sync_in.mapped_email else None
- logger.info(f"收到 M2M 同步请求: App {current_app.app_id}, Mobile {sync_in.mobile}, Action {sync_in.sync_action}")
- # ==========================================
- # 1. Handle DELETE Action
- # ==========================================
- if sync_in.sync_action == "DELETE":
- # 查找用户
- user = db.query(User).filter(User.mobile == sync_in.mobile).first()
- if not user:
- # 用户不存在,无法删除映射,直接抛出404或视作成功
- logger.warning(f"M2M 删除失败: 用户 {sync_in.mobile} 不存在")
- raise HTTPException(status_code=404, detail="用户不存在")
- # 查找映射
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.user_id == user.id
- ).first()
- if not mapping:
- logger.warning(f"M2M 删除失败: 映射不存在 (User {sync_in.mobile})")
- raise HTTPException(status_code=404, detail="映射关系不存在")
- if mapping.mapped_key is None:
- raise HTTPException(
- status_code=400,
- detail="映射记录缺少外部账号,请在平台侧补全后再删除",
- )
- if mapping.mapped_key != mapped_key:
- raise HTTPException(status_code=400, detail="映射账号与平台记录不一致")
- # 构造返回数据(删除前快照,将状态置为 False)
- resp_data = MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=user.mobile,
- user_status=user.status,
- is_active=False # 标记为非活跃/已删除
- )
- # 执行物理删除 (只删映射,不删用户)
- db.delete(mapping)
- db.commit()
- # 记录日志
- LogService.create_log(
- db=db,
- app_id=current_app.id,
- operator_id=current_app.owner_id,
- action_type=ActionType.DELETE,
- target_user_id=user.id,
- target_mobile=user.mobile,
- ip_address=get_client_ip(request),
- details={
- "mapped_key": mapping.mapped_key,
- "action": "M2M_DELETE",
- "source": "M2M",
- "sync_action": "DELETE",
- },
- )
- logger.info(f"M2M 删除成功: {sync_in.mobile}")
- return resp_data
- # ==========================================
- # 2. Handle UPSERT Action (Existing Logic)
- # ==========================================
- in_name = (sync_in.name or "").strip()
- if not in_name:
- raise HTTPException(status_code=400, detail="同步操作必须提供姓名")
- # 0. Check Uniqueness for Name (Global Check)
- # We exclude the current user (by mobile) to allow updates to self without conflict
- name_conflict = db.query(User).filter(
- User.name == in_name,
- User.mobile != sync_in.mobile,
- ).first()
- if name_conflict:
- raise HTTPException(status_code=400, detail=f"姓名 '{in_name}' 已存在")
- # 1. Find User or Create
- user = db.query(User).filter(User.mobile == sync_in.mobile).first()
- new_user_created = False
-
- if not user:
- # Create New User — english_name 始终由姓名生成并去重(忽略请求中的 english_name)
- in_english_name = generate_english_name(in_name).strip()
- if not in_english_name:
- in_english_name = f"u{sync_in.mobile[-4:]}"
- original_base = in_english_name
- counter = 1
- while db.query(User).filter(
- User.english_name == in_english_name,
- User.is_deleted == 0,
- ).first():
- in_english_name = f"{original_base}{counter}"
- counter += 1
- # Auto create user
- password = security.generate_alphanumeric_password(8) # Random password letters+digits
-
- user = User(
- mobile=sync_in.mobile,
- password_hash=security.get_password_hash(password),
- status="ACTIVE",
- role="ORDINARY_USER",
- name=in_name,
- english_name=in_english_name
- )
- db.add(user)
- db.commit()
- db.refresh(user)
- new_user_created = True
- logger.info(f"M2M 自动创建用户: {sync_in.mobile}")
- else:
- # Update Existing User - 已有用户不允许修改 name、mobile、english_name
- # Check if trying to modify restricted fields
- if sync_in.mobile != user.mobile:
- raise HTTPException(status_code=400, detail="已有用户不允许修改手机号")
-
- if (user.name or "").strip() != in_name:
- raise HTTPException(status_code=400, detail="已有用户不允许修改姓名")
-
- # 已有用户的其他字段(如status等)可以更新,但当前M2M接口不涉及
- # 这里只处理映射关系的更新
- # 2. Handle Mapping
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.user_id == user.id
- ).first()
- # Check Uniqueness for mapped_key (if changing or new, and provided)
- if mapped_key and (not mapping or mapping.mapped_key != mapped_key):
- key_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.mapped_key == mapped_key
- ).first()
- if key_exists:
- raise HTTPException(status_code=400, detail=f"该应用下账号 {mapped_key} 已被使用")
- # Check Uniqueness for mapped_email (if changing or new, and provided)
- if mapped_email and (not mapping or mapping.mapped_email != mapped_email):
- email_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.mapped_email == mapped_email
- ).first()
- if email_exists:
- raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapped_email} 已被使用")
- new_mapping_created = False
- if mapping:
- # Update existing mapping
- mapping.mapped_key = mapped_key
- if sync_in.is_active is not None:
- mapping.is_active = sync_in.is_active
- if sync_in.mapped_email is not None:
- mapping.mapped_email = mapped_email
- else:
- # Create new mapping
- new_mapping_created = True
- mapping = AppUserMapping(
- app_id=current_app.id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=mapped_email,
- is_active=sync_in.is_active if sync_in.is_active is not None else True
- )
- db.add(mapping)
- db.commit()
- db.refresh(mapping)
- # LOGGING
- LogService.create_log(
- db=db,
- app_id=current_app.id,
- operator_id=current_app.owner_id,
- action_type=ActionType.SYNC_M2M,
- target_user_id=user.id,
- target_mobile=user.mobile,
- ip_address=get_client_ip(request),
- details={
- "mapped_key": mapped_key,
- "mapped_email": mapped_email,
- "new_user_created": new_user_created,
- "new_mapping_created": new_mapping_created,
- "sync_action": "UPSERT",
- "source": "M2M",
- "is_active": mapping.is_active,
- },
- )
-
- logger.info(f"M2M 同步成功: {sync_in.mobile} (Mapping: {mapping.id})")
- return MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=user.mobile,
- user_status=user.status,
- is_active=mapping.is_active
- )
- # ==========================================
- # Operation Logs
- # ==========================================
- @router.get("/{app_id}/logs", response_model=OperationLogList, summary="获取操作日志")
- def read_logs(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- skip: int = 0,
- limit: int = 20,
- action_type: ActionType = Query(None),
- keyword: str = Query(None, description="搜索手机号"),
- start_date: datetime = Query(None),
- end_date: datetime = Query(None),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取应用操作日志。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- total, logs = LogService.get_logs(
- db=db,
- app_id=app_id,
- skip=skip,
- limit=limit,
- action_type=action_type,
- keyword=keyword,
- start_date=start_date,
- end_date=end_date
- )
-
- result = []
- for log in logs:
- # Enrich operator mobile
- operator_mobile = log.operator.mobile if log.operator else "Unknown"
-
- result.append(OperationLogResponse(
- id=log.id,
- app_id=log.app_id,
- action_type=log.action_type,
- target_mobile=log.target_mobile,
- details=log.details,
- operator_id=log.operator_id,
- operator_mobile=operator_mobile,
- target_user_id=log.target_user_id,
- created_at=log.created_at
- ))
-
- return {"total": total, "items": result}
|