| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729 |
- import secrets
- import string
- import io
- import csv
- import pandas as pd
- import logging
- import json
- from typing import List, Optional
- from datetime import datetime
- from fastapi import APIRouter, Depends, HTTPException, Response, UploadFile, File, Form, Query, Request
- from fastapi.responses import StreamingResponse
- from sqlalchemy.orm import Session
- from sqlalchemy import desc, or_, func
- from app.api.v1 import deps
- from app.core import security
- from app.models.application import Application, ProtocolType
- from app.models.user import User
- from app.models.mapping import AppUserMapping
- from app.models.app_category import AppCategory
- from app.core.utils import generate_english_name, get_client_ip
- from app.schemas.application import (
- ApplicationCreate,
- ApplicationUpdate,
- ApplicationResponse,
- ApplicationList,
- ApplicationSecretDisplay,
- ViewSecretRequest,
- RegenerateSecretRequest,
- ApplicationTransferRequest,
- AppSyncRequest,
- CategoryStats,
- AppCategoryCreate,
- AppCategoryUpdate,
- AppCategoryResponse,
- BatchUpdateCategoryRequest
- )
- from app.schemas.mapping import (
- MappingList,
- MappingResponse,
- MappingCreate,
- MappingUpdate,
- MappingDelete,
- MappingPreviewResponse,
- MappingImportSummary,
- MappingStrategy,
- ImportLogResponse
- )
- from app.schemas.user import UserSyncRequest, UserSyncList
- from app.services.mapping_service import MappingService
- from app.services.sms_service import SmsService
- from app.services.log_service import LogService
- from app.services.hydra_service import hydra_service
- from app.schemas.operation_log import ActionType, OperationLogList, OperationLogResponse
- router = APIRouter()
- logger = logging.getLogger(__name__)
- def generate_access_token():
- return secrets.token_urlsafe(32)
- def generate_app_credentials():
- # Generate a random 16-char App ID (hex or alphanumeric)
- app_id = "app_" + secrets.token_hex(8)
- # Generate a strong 32-char App Secret
- alphabet = string.ascii_letters + string.digits
- app_secret = ''.join(secrets.choice(alphabet) for i in range(32))
- return app_id, app_secret
- @router.get("/", response_model=ApplicationList, summary="获取应用列表")
- def read_apps(
- skip: int = 0,
- limit: int = 10,
- search: str = None,
- category_id: int = None,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取应用列表(分页)。
- 超级管理员可以查看所有,开发者只能查看自己的应用。
-
- category_id:
- - None: 获取所有应用
- - 0: 获取未分类的应用(category_id IS NULL)
- - >0: 获取指定分类的应用
- """
- query = db.query(Application).filter(Application.is_deleted == False)
-
- if current_user.role != "SUPER_ADMIN":
- query = query.filter(Application.owner_id == current_user.id)
-
- # 按分类筛选
- if category_id is not None:
- if category_id == 0:
- # 未分类的应用
- query = query.filter(Application.category_id.is_(None))
- else:
- # 指定分类的应用
- query = query.filter(Application.category_id == category_id)
-
- if search:
- # Search by name or app_id
- query = query.filter(
- or_(
- Application.app_name.ilike(f"%{search}%"),
- Application.app_id.ilike(f"%{search}%")
- )
- )
-
- total = query.count()
- apps = query.order_by(desc(Application.id)).offset(skip).limit(limit).all()
-
- # 为每个应用添加 category_name
- items = []
- for app in apps:
- app_dict = {
- **{c.name: getattr(app, c.name) for c in app.__table__.columns},
- "category_name": app.category.name if app.category else None
- }
- items.append(ApplicationResponse(**app_dict))
-
- return {"total": total, "items": items}
- @router.get("/categories", response_model=List[CategoryStats], summary="获取所有应用分类")
- def get_categories(
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- search: str = None,
- ):
- """
- 获取所有应用分类及其统计信息。
- 包括:预设分类 + 已使用的自定义分类
- 超级管理员查看所有,开发者只查看自己应用使用的分类。
- """
- try:
- logger.info(f"[分类管理] 请求开始 - 用户ID: {current_user.id}, 角色: {current_user.role}, 搜索参数: {repr(search)}")
-
- # 获取预设分类
- preset_query = db.query(AppCategory).order_by(AppCategory.name)
- if search and search.strip():
- preset_query = preset_query.filter(AppCategory.name.ilike(f"%{search.strip()}%"))
- preset_categories = preset_query.all()
- logger.info(f"[分类管理] 预设分类数量: {len(preset_categories)}")
-
- # 统计每个预设分类的应用数量
- category_list = []
- for cat in preset_categories:
- query = db.query(func.count(Application.id)).filter(
- Application.is_deleted == False,
- Application.category_id == cat.id
- )
- if current_user.role != "SUPER_ADMIN":
- query = query.filter(Application.owner_id == current_user.id)
- app_count = query.scalar() or 0
-
- category_list.append(CategoryStats(
- category_id=cat.id,
- category_name=cat.name,
- app_count=app_count
- ))
-
- logger.info(f"[分类管理] 最终返回数据数量: {len(category_list)}")
- return category_list
-
- except Exception as e:
- logger.error(f"[分类管理] 发生未捕获的异常: {e}", exc_info=True)
- raise
- @router.get("/categories/{category_id}/apps", response_model=ApplicationList, summary="获取分类下的应用列表")
- def get_category_apps(
- category_id: int,
- skip: int = 0,
- limit: int = 100,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """获取指定分类下的所有应用"""
- category = db.query(AppCategory).filter(AppCategory.id == category_id).first()
- if not category:
- raise HTTPException(status_code=404, detail="分类不存在")
-
- query = db.query(Application).filter(
- Application.is_deleted == False,
- Application.category_id == category_id
- )
-
- if current_user.role != "SUPER_ADMIN":
- query = query.filter(Application.owner_id == current_user.id)
-
- total = query.count()
- apps = query.order_by(desc(Application.id)).offset(skip).limit(limit).all()
-
- # 为每个应用添加 category_name
- items = []
- for app in apps:
- app_dict = {
- **{c.name: getattr(app, c.name) for c in app.__table__.columns},
- "category_name": app.category.name if app.category else None
- }
- items.append(ApplicationResponse(**app_dict))
-
- return {"total": total, "items": items}
- @router.post("/categories/batch-update", summary="批量更新应用分类")
- def batch_update_app_category(
- req: BatchUpdateCategoryRequest,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """批量更新应用的分类"""
- if not req.password:
- raise HTTPException(status_code=400, detail="需要提供管理员密码")
-
- if not security.verify_password(req.password, current_user.password_hash):
- raise HTTPException(status_code=400, detail="密码错误")
-
- if not req.app_ids:
- raise HTTPException(status_code=400, detail="请选择要更新的应用")
-
- # 验证分类是否存在(如果提供了 category_id)
- if req.category_id:
- category = db.query(AppCategory).filter(AppCategory.id == req.category_id).first()
- if not category:
- raise HTTPException(status_code=404, detail="分类不存在")
-
- # 查询要更新的应用
- query = db.query(Application).filter(
- Application.id.in_(req.app_ids),
- Application.is_deleted == False
- )
-
- if current_user.role != "SUPER_ADMIN":
- query = query.filter(Application.owner_id == current_user.id)
-
- apps = query.all()
-
- if len(apps) != len(req.app_ids):
- raise HTTPException(status_code=403, detail="部分应用不存在或无权限")
-
- # 批量更新分类
- updated_count = 0
- for app in apps:
- app.category_id = req.category_id
- db.add(app)
- updated_count += 1
-
- db.commit()
-
- action = f"设置为分类ID {req.category_id}" if req.category_id else "移除分类"
- logger.info(f"批量更新应用分类成功: {updated_count}个应用 {action} (Operator: {current_user.mobile})")
- return {"message": f"成功更新 {updated_count} 个应用的分类", "count": updated_count}
- @router.delete("/categories/{category_id}", summary="删除分类")
- def delete_category(
- category_id: int,
- verification_code: str = Query(..., description="手机验证码"),
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 删除分类。
- 注意:必须先移出该分类下的所有应用才能删除分类。
- 需要手机验证码验证。
- """
- if not SmsService.verify_code(current_user.mobile, verification_code):
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
-
- category = db.query(AppCategory).filter(AppCategory.id == category_id).first()
- if not category:
- raise HTTPException(status_code=404, detail="分类不存在")
-
- # 检查是否有应用使用此分类
- apps_count = db.query(Application).filter(
- Application.is_deleted == False,
- Application.category_id == category_id
- ).count()
-
- if apps_count > 0:
- raise HTTPException(
- status_code=400,
- detail=f"无法删除:有 {apps_count} 个应用正在使用此分类。请先移出这些应用后再删除分类。"
- )
-
- db.delete(category)
- db.commit()
-
- logger.info(f"分类删除成功: {category.name} (Operator: {current_user.mobile})")
- return {"message": f"成功删除分类 '{category.name}'"}
- # ==========================================
- # Preset Category Management
- # ==========================================
- @router.get("/preset-categories", response_model=List[AppCategoryResponse], summary="获取预设分类列表")
- def get_preset_categories(
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """获取所有预设分类"""
- categories = db.query(AppCategory).order_by(AppCategory.name).all()
- return categories
- @router.post("/preset-categories", response_model=AppCategoryResponse, summary="创建预设分类")
- def create_preset_category(
- category_in: AppCategoryCreate,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """创建预设分类(仅超级管理员)"""
- if current_user.role != "SUPER_ADMIN":
- raise HTTPException(status_code=403, detail="权限不足")
-
- # 检查分类名是否已存在
- existing = db.query(AppCategory).filter(AppCategory.name == category_in.name).first()
- if existing:
- raise HTTPException(status_code=400, detail=f"分类 '{category_in.name}' 已存在")
-
- category = AppCategory(**category_in.model_dump())
- db.add(category)
- db.commit()
- db.refresh(category)
-
- logger.info(f"预设分类创建成功: {category_in.name} (Operator: {current_user.mobile})")
- return category
- @router.put("/preset-categories/{category_id}", response_model=AppCategoryResponse, summary="更新预设分类")
- def update_preset_category(
- category_id: int,
- category_in: AppCategoryUpdate,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """更新预设分类(仅超级管理员,需要密码验证)"""
- if current_user.role != "SUPER_ADMIN":
- raise HTTPException(status_code=403, detail="权限不足")
-
- # 密码验证
- if not category_in.password:
- raise HTTPException(status_code=400, detail="需要提供密码")
-
- from app.core import security
- if not security.verify_password(category_in.password, current_user.password_hash):
- logger.warning(f"预设分类更新失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
-
- category = db.query(AppCategory).filter(AppCategory.id == category_id).first()
- if not category:
- raise HTTPException(status_code=404, detail="预设分类未找到")
-
- # 如果更新名称,检查是否重复
- if category_in.name and category_in.name != category.name:
- existing = db.query(AppCategory).filter(AppCategory.name == category_in.name).first()
- if existing:
- raise HTTPException(status_code=400, detail=f"分类 '{category_in.name}' 已存在")
-
- update_data = category_in.model_dump(exclude_unset=True)
- # 移除密码字段,不保存到数据库
- update_data.pop('password', None)
-
- for field, value in update_data.items():
- setattr(category, field, value)
-
- db.commit()
- db.refresh(category)
-
- logger.info(f"预设分类更新成功: {category.name} (Operator: {current_user.mobile})")
- return category
- @router.delete("/preset-categories/{category_id}", summary="删除预设分类")
- def delete_preset_category(
- category_id: int,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """删除预设分类(仅超级管理员)"""
- if current_user.role != "SUPER_ADMIN":
- raise HTTPException(status_code=403, detail="权限不足")
-
- category = db.query(AppCategory).filter(AppCategory.id == category_id).first()
- if not category:
- raise HTTPException(status_code=404, detail="预设分类未找到")
-
- # 检查是否有应用使用此分类
- apps_count = db.query(Application).filter(
- Application.category_id == category.id,
- Application.is_deleted == False
- ).count()
-
- if apps_count > 0:
- raise HTTPException(
- status_code=400,
- detail=f"无法删除:有 {apps_count} 个应用正在使用此分类"
- )
-
- db.delete(category)
- db.commit()
-
- logger.info(f"预设分类删除成功: {category.name} (Operator: {current_user.mobile})")
- return {"message": "删除成功"}
- @router.get("/{app_id}", response_model=ApplicationResponse, summary="获取单个应用详情")
- def read_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取单个应用详情。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- # 设置 category_name
- app_dict = {
- **{c.name: getattr(app, c.name) for c in app.__table__.columns},
- "category_name": app.category.name if app.category else None
- }
- return ApplicationResponse(**app_dict)
- @router.post("/", response_model=ApplicationSecretDisplay, summary="创建应用")
- def create_app(
- *,
- db: Session = Depends(deps.get_db),
- app_in: ApplicationCreate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 创建新应用。只会返回一次明文密钥。
- """
- # 1. Generate ID and Secret
- app_id, app_secret = generate_app_credentials()
- # 2. Generate Access Token
- access_token = generate_access_token()
-
- # 3. Store Secret (Plain text needed for HMAC verification)
-
- db_app = Application(
- app_id=app_id,
- app_secret=app_secret,
- access_token=access_token,
- app_name=app_in.app_name,
- icon_url=app_in.icon_url,
- protocol_type=app_in.protocol_type,
- redirect_uris=app_in.redirect_uris,
- notification_url=app_in.notification_url,
- description=app_in.description,
- category_id=app_in.category_id,
- owner_id=current_user.id # Assign owner
- )
-
- # 验证 category_id 是否存在
- if app_in.category_id:
- category = db.query(AppCategory).filter(AppCategory.id == app_in.category_id).first()
- if not category:
- raise HTTPException(status_code=400, detail=f"分类ID {app_in.category_id} 不存在")
-
- db.add(db_app)
- db.commit()
- db.refresh(db_app)
- logger.info(f"应用创建成功: {app_in.app_name} (ID: {app_id}, Owner: {current_user.mobile})")
- # 如果是 OIDC 应用,自动在 Hydra 中创建 / 更新对应的 OAuth2 Client
- if db_app.protocol_type == ProtocolType.OIDC:
- try:
- raw = db_app.redirect_uris or ""
- redirect_uris: list[str] = []
- if raw:
- # 1. 优先按 JSON 解析(支持 ["url1","url2"] 或 "url1")
- try:
- parsed = json.loads(raw)
- if isinstance(parsed, list):
- redirect_uris = [str(u).strip() for u in parsed if str(u).strip()]
- elif isinstance(parsed, str):
- if parsed.strip():
- redirect_uris = [parsed.strip()]
- except Exception:
- # 2. 非 JSON 时,支持逗号分隔或单个 URL
- parts = [u.strip() for u in raw.split(",") if u.strip()]
- if parts:
- redirect_uris = parts
- hydra_service.create_or_update_client(
- client_id=db_app.app_id,
- client_secret=db_app.app_secret,
- redirect_uris=redirect_uris,
- client_name=db_app.app_name or db_app.app_id,
- )
- logger.info(f"Hydra OIDC Client 已创建/更新: {db_app.app_id}, redirect_uris={redirect_uris}")
- except Exception as e:
- logger.exception(
- "应用创建成功,但在 Hydra 创建 OIDC Client 失败 (app_id=%s): %s",
- db_app.app_id,
- e,
- )
- # 如需强一致,可以在这里 raise HTTPException 中断创建流程
- return ApplicationSecretDisplay(app_id=app_id, app_secret=app_secret, access_token=access_token)
- @router.put("/{app_id}", response_model=ApplicationResponse, summary="更新应用")
- def update_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- app_in: ApplicationUpdate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 更新应用信息。需要手机验证码和密码验证。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Security Verification
- if not app_in.verification_code:
- raise HTTPException(status_code=400, detail="需要提供手机验证码")
- if not SmsService.verify_code(current_user.mobile, app_in.verification_code):
- logger.warning(f"应用更新失败: 验证码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
-
- update_data = app_in.model_dump(exclude_unset=True)
- # Remove security fields from update data
- update_data.pop('password', None)
- update_data.pop('verification_code', None)
-
- # 验证 category_id 是否存在
- if 'category_id' in update_data and update_data['category_id']:
- category = db.query(AppCategory).filter(AppCategory.id == update_data['category_id']).first()
- if not category:
- raise HTTPException(status_code=400, detail=f"分类ID {update_data['category_id']} 不存在")
- for field, value in update_data.items():
- setattr(app, field, value)
-
- db.add(app)
- db.commit()
- db.refresh(app)
- # 如果是 OIDC 应用,编辑后同步 Hydra 中的 OAuth2 Client
- if app.protocol_type == ProtocolType.OIDC:
- try:
- raw = app.redirect_uris or ""
- redirect_uris: list[str] = []
- if raw:
- # 1. 优先按 JSON 解析(支持 ["url1","url2"] 或 "url1")
- try:
- parsed = json.loads(raw)
- if isinstance(parsed, list):
- redirect_uris = [str(u).strip() for u in parsed if str(u).strip()]
- elif isinstance(parsed, str):
- if parsed.strip():
- redirect_uris = [parsed.strip()]
- except Exception:
- # 2. 非 JSON 时,支持逗号分隔或单个 URL
- parts = [u.strip() for u in raw.split(",") if u.strip()]
- if parts:
- redirect_uris = parts
- hydra_service.create_or_update_client(
- client_id=app.app_id,
- client_secret=app.app_secret,
- redirect_uris=redirect_uris,
- client_name=app.app_name or app.app_id,
- )
- logger.info(f"Hydra OIDC Client 已在编辑后同步: {app.app_id}, redirect_uris={redirect_uris}")
- except Exception as e:
- logger.exception(
- "应用编辑成功,但在 Hydra 同步 OIDC Client 失败 (app_id=%s): %s",
- app.app_id,
- e,
- )
- # 5. Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.UPDATE,
- details=update_data
- )
-
- logger.info(f"应用更新成功: {app.app_name} (ID: {app.app_id})")
- return app
- @router.delete("/{app_id}", response_model=ApplicationResponse, summary="删除应用")
- def delete_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 软删除应用。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- app.is_deleted = True
- db.add(app)
- db.commit()
-
- logger.info(f"应用删除成功: {app.app_name} (ID: {app.app_id}, Operator: {current_user.mobile})")
- return app
- @router.post("/{app_id}/regenerate-secret", response_model=ApplicationSecretDisplay, summary="重新生成密钥")
- def regenerate_secret(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- req: RegenerateSecretRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 重新生成应用密钥。需要手机验证码和密码验证。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- # Security Verification
- if not security.verify_password(req.password, current_user.password_hash):
- logger.warning(f"重置密钥失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
- if not SmsService.verify_code(current_user.mobile, req.verification_code):
- logger.warning(f"重置密钥失败: 验证码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
- _, new_secret = generate_app_credentials()
- app.app_secret = new_secret
-
- db.add(app)
- db.commit()
-
- # Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.REGENERATE_SECRET,
- details={"message": "Regenerated App Secret"}
- )
- logger.info(f"应用密钥已重置: {app.app_name} (ID: {app.app_id})")
- return ApplicationSecretDisplay(app_id=app.app_id, app_secret=new_secret, access_token=app.access_token)
- @router.post("/{app_id}/view-secret", response_model=ApplicationSecretDisplay, summary="查看密钥")
- def view_secret(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- req: ViewSecretRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 查看应用密钥。需要验证用户密码。
- """
- # 1. Verify Password
- if not security.verify_password(req.password, current_user.password_hash):
- logger.warning(f"查看密钥失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
-
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- # Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.VIEW_SECRET,
- details={"message": "Viewed App Secret"}
- )
-
- logger.info(f"查看应用密钥: {app.app_name} (Operator: {current_user.mobile})")
- return ApplicationSecretDisplay(app_id=app.app_id, app_secret=app.app_secret, access_token=app.access_token)
- @router.post("/{app_id}/transfer", response_model=ApplicationResponse, summary="转让应用")
- def transfer_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- req: ApplicationTransferRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 将应用转让给其他开发者或超级管理员。
- 需要验证:目标用户手机号、当前用户密码、短信验证码。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # 1. Verify Password
- if not security.verify_password(req.password, current_user.password_hash):
- logger.warning(f"转让应用失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
- # 2. Verify SMS Code
- if not SmsService.verify_code(current_user.mobile, req.verification_code):
- logger.warning(f"转让应用失败: 验证码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
- # 3. Verify Target User
- target_user = db.query(User).filter(User.mobile == req.target_mobile, User.is_deleted == 0).first()
- if not target_user:
- raise HTTPException(status_code=404, detail="目标用户不存在")
-
- if target_user.status != "ACTIVE":
- raise HTTPException(status_code=400, detail="目标用户状态不正常")
-
- if target_user.role not in ["DEVELOPER", "SUPER_ADMIN"]:
- raise HTTPException(status_code=400, detail="目标用户必须是开发者或超级管理员")
-
- if target_user.id == app.owner_id:
- raise HTTPException(status_code=400, detail="应用已归属于该用户")
- # 4. Transfer
- old_owner_id = app.owner_id
- app.owner_id = target_user.id
-
- db.add(app)
- db.commit()
- db.refresh(app)
-
- # 5. Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.TRANSFER,
- target_user_id=target_user.id,
- target_mobile=target_user.mobile,
- details={
- "old_owner_id": old_owner_id,
- "new_owner_id": target_user.id
- }
- )
-
- logger.info(f"应用转让成功: {app.app_name} 从 {current_user.mobile} 转让给 {target_user.mobile}")
-
- return app
- # ==========================================
- # Mappings
- # ==========================================
- @router.get("/{app_id}/mappings", response_model=MappingList, summary="获取应用映射列表")
- def read_mappings(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- skip: int = 0,
- limit: int = 10,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取应用的账号映射列表。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- query = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id)
- total = query.count()
- mappings = query.order_by(desc(AppUserMapping.id)).offset(skip).limit(limit).all()
-
- # Enrich with user mobile (handled by ORM relation usually, but for Pydantic 'from_attributes')
- # We added `user_mobile` to MappingResponse, so we need to ensure it's populated.
- # The ORM `mapping.user` is lazy loaded, which is fine for sync code.
- result = []
- for m in mappings:
- result.append(MappingResponse(
- id=m.id,
- app_id=m.app_id,
- user_id=m.user_id,
- mapped_key=m.mapped_key,
- mapped_email=m.mapped_email,
- user_mobile=m.user.mobile if m.user else "Deleted User",
- user_status=m.user.status if m.user else "DELETED",
- is_active=m.is_active
- ))
-
- return {"total": total, "items": result}
- @router.post("/{app_id}/mappings", response_model=MappingResponse, summary="创建映射")
- def create_mapping(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- mapping_in: MappingCreate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 手动创建映射。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Verify Password
- if not security.verify_password(mapping_in.password, current_user.password_hash):
- logger.warning(f"创建映射失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
- # Normalize input: treat empty strings as None to avoid unique constraint violations
- mapped_key = mapping_in.mapped_key if mapping_in.mapped_key else None
- mapped_email = mapping_in.mapped_email if mapping_in.mapped_email else None
- # 1. Find User or Create
- user = db.query(User).filter(User.mobile == mapping_in.mobile, User.is_deleted == 0).first()
- new_user_created = False
- generated_password = None
- if not user:
- # Auto create user
- password_plain = security.generate_alphanumeric_password(8) # Random password letters+digits
- random_suffix = security.generate_alphanumeric_password(6)
- user = User(
- mobile=mapping_in.mobile,
- password_hash=security.get_password_hash(password_plain),
- status="ACTIVE",
- role="ORDINARY_USER",
- name=f"用户{random_suffix}",
- english_name=mapped_key
- )
- db.add(user)
- db.commit()
- db.refresh(user)
- new_user_created = True
- generated_password = password_plain
- logger.info(f"自动创建用户: {user.mobile}")
- # 2. Check if mapping exists
- existing = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.user_id == user.id
- ).first()
- if existing:
- raise HTTPException(status_code=400, detail="该用户的映射已存在")
- # 3. Check Uniqueness for mapped_email (if provided)
- if mapped_email:
- email_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_email == mapped_email
- ).first()
- if email_exists:
- raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapped_email} 已被使用")
- # 4. Check Uniqueness for mapped_key
- if mapped_key:
- key_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_key == mapped_key
- ).first()
- if key_exists:
- raise HTTPException(status_code=400, detail=f"该应用下账号 {mapped_key} 已被使用")
-
- # 5. Create
- mapping = AppUserMapping(
- app_id=app_id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=mapped_email
- )
- db.add(mapping)
- db.commit()
- db.refresh(mapping)
-
- # LOGGING
- LogService.create_log(
- db=db,
- app_id=app_id,
- operator_id=current_user.id,
- action_type=ActionType.MANUAL_ADD,
- target_user_id=user.id,
- target_mobile=user.mobile,
- details={
- "mapped_key": mapped_key,
- "mapped_email": mapped_email,
- "new_user_created": new_user_created
- }
- )
-
- logger.info(f"映射创建成功: App {app_id} -> User {user.mobile} ({mapped_key})")
- return MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=user.mobile,
- user_status=user.status,
- is_active=mapping.is_active,
- new_user_created=new_user_created,
- generated_password=generated_password
- )
- @router.put("/{app_id}/mappings/{mapping_id}", response_model=MappingResponse, summary="更新映射")
- def update_mapping(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- mapping_id: int,
- mapping_in: MappingUpdate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 更新映射信息。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Verify Password
- if not security.verify_password(mapping_in.password, current_user.password_hash):
- logger.warning(f"更新映射失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.id == mapping_id,
- AppUserMapping.app_id == app_id
- ).first()
- if not mapping:
- raise HTTPException(status_code=404, detail="映射未找到")
- # Check Uniqueness for mapped_key
- if mapping_in.mapped_key is not None and mapping_in.mapped_key != mapping.mapped_key:
- if mapping_in.mapped_key:
- key_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_key == mapping_in.mapped_key
- ).first()
- if key_exists:
- raise HTTPException(status_code=400, detail=f"该应用下账号 {mapping_in.mapped_key} 已被使用")
- # Check Uniqueness for mapped_email
- if mapping_in.mapped_email is not None and mapping_in.mapped_email != mapping.mapped_email:
- if mapping_in.mapped_email:
- email_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_email == mapping_in.mapped_email
- ).first()
- if email_exists:
- raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapping_in.mapped_email} 已被使用")
- # Capture old values for logging
- old_key = mapping.mapped_key
- old_email = mapping.mapped_email
- old_is_active = mapping.is_active
- if mapping_in.mapped_key is not None:
- mapping.mapped_key = mapping_in.mapped_key
- if mapping_in.mapped_email is not None:
- mapping.mapped_email = mapping_in.mapped_email
- if mapping_in.is_active is not None:
- mapping.is_active = mapping_in.is_active
- db.add(mapping)
- db.commit()
- db.refresh(mapping)
- # LOGGING
- LogService.create_log(
- db=db,
- app_id=app_id,
- operator_id=current_user.id,
- action_type=ActionType.UPDATE,
- target_user_id=mapping.user_id,
- target_mobile=mapping.user.mobile if mapping.user else None,
- details={
- "old": {
- "mapped_key": old_key,
- "mapped_email": old_email,
- "is_active": old_is_active,
- },
- "new": {
- "mapped_key": mapping.mapped_key,
- "mapped_email": mapping.mapped_email,
- "is_active": mapping.is_active,
- },
- }
- )
-
- logger.info(f"映射更新成功: App {app_id} User {mapping.user.mobile if mapping.user else 'unknown'}")
- return MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=mapping.user.mobile if mapping.user else "Deleted User",
- user_status=mapping.user.status if mapping.user else "DELETED",
- is_active=mapping.is_active
- )
- @router.delete("/{app_id}/mappings/{mapping_id}", summary="删除映射")
- def delete_mapping(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- mapping_id: int,
- req: MappingDelete,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 删除映射关系。需验证密码。
- """
- # Verify Password
- if not security.verify_password(req.password, current_user.password_hash):
- logger.warning(f"删除映射失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.id == mapping_id,
- AppUserMapping.app_id == app_id
- ).first()
- if not mapping:
- raise HTTPException(status_code=404, detail="映射未找到")
-
- app = db.query(Application).filter(Application.id == app_id).first()
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Capture for logging
- target_user_id = mapping.user_id
- target_mobile = mapping.user.mobile if mapping.user else None
-
- db.delete(mapping)
- db.commit()
-
- # LOGGING
- LogService.create_log(
- db=db,
- app_id=app_id,
- operator_id=current_user.id,
- action_type=ActionType.DELETE,
- target_user_id=target_user_id,
- target_mobile=target_mobile,
- details={"mapping_id": mapping_id}
- )
-
- logger.info(f"映射删除成功: App {app_id}, Mapping {mapping_id}")
-
- return {"message": "删除成功"}
- @router.post("/{app_id}/sync-users", summary="同步所有用户")
- def sync_users_to_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 一键导入用户管理中的用户数据到应用映射中。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Get all active users
- users = db.query(User).filter(User.is_deleted == 0).all()
-
- # Get existing mappings (user_ids)
- existing_mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all()
- mapped_user_ids = {m.user_id for m in existing_mappings}
-
- new_mappings = []
- logger.info(f"开始同步用户到应用 {app.app_name} (ID: {app_id})")
-
- for user in users:
- if user.id in mapped_user_ids:
- continue
-
- # Create mapping
- mapped_key = user.english_name if user.english_name else user.mobile
-
- mapping = AppUserMapping(
- app_id=app.id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=None,
- is_active=True
- )
- new_mappings.append(mapping)
- if new_mappings:
- try:
- db.bulk_save_objects(new_mappings)
- db.commit()
- logger.info(f"用户同步完成: 新增 {len(new_mappings)} 条映射")
- except Exception as e:
- db.rollback()
- logger.error(f"批量同步用户失败: {e}。尝试逐条插入。")
-
- # Fallback: try one by one
- success_count = 0
- for m in new_mappings:
- try:
- db.add(m)
- db.commit()
- success_count += 1
- except Exception as ex:
- db.rollback()
- logger.warning(f"单个用户映射失败 (User: {m.user_id}): {ex}")
-
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.IMPORT,
- details={"message": "Sync all users (partial)", "attempted": len(new_mappings), "success": success_count}
- )
- return {"message": f"同步完成,成功 {success_count} 个,失败 {len(new_mappings) - success_count} 个 (可能是账号冲突)"}
-
- # Log success
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.IMPORT,
- details={"message": "Sync all users", "count": len(new_mappings)}
- )
- return {"message": f"同步成功,新增 {len(new_mappings)} 个用户映射"}
- logger.info("用户同步: 没有需要同步的新用户")
- return {"message": "没有需要同步的用户"}
- @router.post("/{app_id}/sync-users-v2", summary="同步用户 (新版)")
- def sync_users_to_app_v2(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- sync_req: AppSyncRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 高级用户同步功能。
- 支持全量/部分同步,以及可选的默认邮箱初始化。
- 需要手机验证码。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # 1. Verify SMS Code
- if not SmsService.verify_code(current_user.mobile, sync_req.verification_code):
- logger.warning(f"同步用户失败: 验证码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
- # 2. Determine Target Users
- query = db.query(User).filter(User.is_deleted == 0)
-
- if sync_req.mode == "SELECTED":
- if not sync_req.user_ids:
- raise HTTPException(status_code=400, detail="请选择要同步的用户")
- query = query.filter(User.id.in_(sync_req.user_ids))
-
- users = query.all()
-
- if not users:
- return {"message": "没有找到可同步的用户"}
- # 3. Get existing mappings (user_ids) to skip
- existing_mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all()
- mapped_user_ids = {m.user_id for m in existing_mappings}
-
- # Check if email domain is valid format if provided (simple check)
- if sync_req.init_email and not sync_req.email_domain:
- raise HTTPException(status_code=400, detail="开启邮箱初始化时必须填写域名")
- new_mappings = []
-
- for user in users:
- if user.id in mapped_user_ids:
- continue
-
- mapped_key = user.english_name if user.english_name else user.mobile
-
- mapped_email = None
- if sync_req.init_email and user.english_name:
- # Construct email
- domain = sync_req.email_domain.strip()
- if not domain.startswith("@"):
- domain = "@" + domain
- mapped_email = f"{user.english_name}{domain}"
-
- mapping = AppUserMapping(
- app_id=app.id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=mapped_email,
- is_active=True
- )
- new_mappings.append(mapping)
- if not new_mappings:
- return {"message": "所有选中的用户均已存在映射,无需同步"}
- # 4. Insert
- logger.info(f"开始同步(v2)用户到应用 {app.app_name},计划新增 {len(new_mappings)} 条")
-
- success_count = 0
- fail_count = 0
-
- for m in new_mappings:
- try:
- # Additional check: uniqueness of mapped_key in this app
- db.add(m)
- db.commit()
- success_count += 1
- except Exception as e:
- db.rollback()
- fail_count += 1
- logger.warning(f"同步单个映射失败 (User: {m.user_id}): {e}")
-
- # 5. Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.SYNC,
- details={
- "mode": sync_req.mode,
- "init_email": sync_req.init_email,
- "total_attempted": len(new_mappings),
- "success": success_count,
- "failed": fail_count
- }
- )
-
- logger.info(f"同步(v2)完成。成功: {success_count}, 失败: {fail_count}")
- msg = f"同步完成。成功: {success_count},失败: {fail_count}"
- if fail_count > 0:
- msg += " (失败原因可能是账号或邮箱冲突)"
-
- return {"message": msg}
- @router.get("/{app_id}/mappings/export", summary="导出映射")
- def export_mappings(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 导出所有映射到 Excel (.xlsx)。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all()
-
- # Prepare data for DataFrame
- data = []
- for m in mappings:
- mobile = m.user.mobile if m.user else "Deleted User"
- data.append({
- '手机号': mobile,
- '映射账号': m.mapped_key,
- '映射邮箱': m.mapped_email or ''
- })
-
- # Create DataFrame
- df = pd.DataFrame(data)
-
- # If no data, create an empty DataFrame with columns
- if not data:
- df = pd.DataFrame(columns=['手机号', '映射账号', '映射邮箱'])
- # Write to Excel BytesIO
- output = io.BytesIO()
- with pd.ExcelWriter(output, engine='openpyxl') as writer:
- df.to_excel(writer, index=False)
-
- output.seek(0)
-
- filename = f"mappings_app_{app_id}.xlsx"
- logger.info(f"导出映射成功: App {app_id}, Count {len(mappings)}")
-
- return StreamingResponse(
- output,
- media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- headers={"Content-Disposition": f"attachment; filename={filename}"}
- )
- @router.post("/{app_id}/mapping/preview", response_model=MappingPreviewResponse, summary="预览映射导入")
- async def preview_mapping(
- app_id: int,
- file: UploadFile = File(...),
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 预览 Excel/CSV 映射导入。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- contents = await file.read()
- filename = file.filename
- try:
- return MappingService.preview_import(db, app_id, contents, filename)
- except Exception as e:
- logger.error(f"导入预览失败: {e}", exc_info=True)
- raise HTTPException(status_code=400, detail=f"解析文件失败: {str(e)}")
- @router.post("/send-import-verification-code", summary="发送导入验证码")
- def send_import_verification_code(
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 发送验证码给当前登录用户(用于敏感操作验证,如导入)。
- """
- SmsService.send_code(current_user.mobile)
- logger.info(f"发送导入验证码: {current_user.mobile}")
- return {"message": "验证码已发送"}
- @router.post("/{app_id}/mapping/import", response_model=ImportLogResponse, summary="执行映射导入")
- async def import_mapping(
- app_id: int,
- file: UploadFile = File(...),
- strategy: MappingStrategy = Form(MappingStrategy.SKIP),
- verification_code: str = Form(...),
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 执行映射导入操作。需要验证短信验证码。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- contents = await file.read()
- filename = file.filename
-
- logger.info(f"开始执行映射导入: App {app_id}, File {filename}, Strategy {strategy}")
- try:
- result = MappingService.execute_import(db, app_id, contents, filename, strategy, current_user.mobile, verification_code)
- except Exception as e:
- logger.error(f"执行映射导入异常: {e}", exc_info=True)
- raise e
-
- # LOGGING
- # For import, we log the summary and the logs structure
- LogService.create_log(
- db=db,
- app_id=app_id,
- operator_id=current_user.id,
- action_type=ActionType.IMPORT,
- details=result.model_dump(mode='json') # Store full result including logs
- )
-
- logger.info(f"映射导入完成: 成功 {result.summary.inserted + result.summary.updated}, 失败 {result.summary.failed}")
-
- return result
- @router.get("/mapping/users", response_model=UserSyncList, summary="获取全量用户(M2M)")
- def get_all_users_m2m(
- *,
- db: Session = Depends(deps.get_db),
- skip: int = 0,
- limit: int = 100,
- current_app: Application = Depends(deps.get_current_app),
- ):
- """
- 开发者拉取全量用户接口。
- 仅返回:手机号、姓名、英文名。
- 需要应用访问令牌 (Authorization Bearer JWT 或 X-App-Access-Token)。
- """
- query = db.query(User).filter(User.is_deleted == 0)
-
- total = query.count()
- users = query.order_by(User.id).offset(skip).limit(limit).all()
-
- return {"total": total, "items": users}
- @router.post("/mapping/sync", response_model=MappingResponse, summary="同步映射 (M2M)")
- def sync_mapping(
- request: Request,
- *,
- db: Session = Depends(deps.get_db),
- sync_in: UserSyncRequest,
- current_app: Application = Depends(deps.get_current_app),
- ):
- """
- 从外部平台同步用户映射关系(机器对机器)。
- 支持增删改查:
- - UPSERT (默认): 创建或更新映射及用户;必须提供姓名;新建用户时英文名由姓名自动生成并去重。
- - DELETE: 仅删除应用与用户的映射关系,不删除用户。
- 需要应用访问令牌 (Authorization Bearer JWT 或 X-App-Access-Token)。
- """
- # Normalize input: treat empty strings as None (mobile / mapped_key 已在 Schema 中强制校验)
- mapped_key = sync_in.mapped_key
- mapped_email = sync_in.mapped_email if sync_in.mapped_email else None
- logger.info(f"收到 M2M 同步请求: App {current_app.app_id}, Mobile {sync_in.mobile}, Action {sync_in.sync_action}")
- # ==========================================
- # 1. Handle DELETE Action
- # ==========================================
- if sync_in.sync_action == "DELETE":
- # 查找用户
- user = db.query(User).filter(User.mobile == sync_in.mobile).first()
- if not user:
- # 用户不存在,无法删除映射,直接抛出404或视作成功
- logger.warning(f"M2M 删除失败: 用户 {sync_in.mobile} 不存在")
- raise HTTPException(status_code=404, detail="用户不存在")
- # 查找映射
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.user_id == user.id
- ).first()
- if not mapping:
- logger.warning(f"M2M 删除失败: 映射不存在 (User {sync_in.mobile})")
- raise HTTPException(status_code=404, detail="映射关系不存在")
- if mapping.mapped_key is None:
- raise HTTPException(
- status_code=400,
- detail="映射记录缺少外部账号,请在平台侧补全后再删除",
- )
- if mapping.mapped_key != mapped_key:
- raise HTTPException(status_code=400, detail="映射账号与平台记录不一致")
- # 构造返回数据(删除前快照,将状态置为 False)
- resp_data = MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=user.mobile,
- user_status=user.status,
- is_active=False # 标记为非活跃/已删除
- )
- # 执行物理删除 (只删映射,不删用户)
- db.delete(mapping)
- db.commit()
- # 记录日志
- LogService.create_log(
- db=db,
- app_id=current_app.id,
- operator_id=current_app.owner_id,
- action_type=ActionType.DELETE,
- target_user_id=user.id,
- target_mobile=user.mobile,
- ip_address=get_client_ip(request),
- details={
- "mapped_key": mapping.mapped_key,
- "action": "M2M_DELETE",
- "source": "M2M",
- "sync_action": "DELETE",
- },
- )
- logger.info(f"M2M 删除成功: {sync_in.mobile}")
- return resp_data
- # ==========================================
- # 2. Handle UPSERT Action (Existing Logic)
- # ==========================================
- in_name = (sync_in.name or "").strip()
- if not in_name:
- raise HTTPException(status_code=400, detail="同步操作必须提供姓名")
- # 0. Check Uniqueness for Name (Global Check)
- # We exclude the current user (by mobile) to allow updates to self without conflict
- name_conflict = db.query(User).filter(
- User.name == in_name,
- User.mobile != sync_in.mobile,
- ).first()
- if name_conflict:
- raise HTTPException(status_code=400, detail=f"姓名 '{in_name}' 已存在")
- # 1. Find User or Create
- user = db.query(User).filter(User.mobile == sync_in.mobile).first()
- new_user_created = False
-
- if not user:
- # Create New User — english_name 始终由姓名生成并去重(忽略请求中的 english_name)
- in_english_name = generate_english_name(in_name).strip()
- if not in_english_name:
- in_english_name = f"u{sync_in.mobile[-4:]}"
- original_base = in_english_name
- counter = 1
- while db.query(User).filter(
- User.english_name == in_english_name,
- User.is_deleted == 0,
- ).first():
- in_english_name = f"{original_base}{counter}"
- counter += 1
- # Auto create user
- password = security.generate_alphanumeric_password(8) # Random password letters+digits
-
- user = User(
- mobile=sync_in.mobile,
- password_hash=security.get_password_hash(password),
- status="ACTIVE",
- role="ORDINARY_USER",
- name=in_name,
- english_name=in_english_name
- )
- db.add(user)
- db.commit()
- db.refresh(user)
- new_user_created = True
- logger.info(f"M2M 自动创建用户: {sync_in.mobile}")
- else:
- # Update Existing User - 已有用户不允许修改 name、mobile、english_name
- # Check if trying to modify restricted fields
- if sync_in.mobile != user.mobile:
- raise HTTPException(status_code=400, detail="已有用户不允许修改手机号")
-
- if (user.name or "").strip() != in_name:
- raise HTTPException(status_code=400, detail="已有用户不允许修改姓名")
-
- # 已有用户的其他字段(如status等)可以更新,但当前M2M接口不涉及
- # 这里只处理映射关系的更新
- # 2. Handle Mapping
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.user_id == user.id
- ).first()
- # Check Uniqueness for mapped_key (if changing or new, and provided)
- if mapped_key and (not mapping or mapping.mapped_key != mapped_key):
- key_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.mapped_key == mapped_key
- ).first()
- if key_exists:
- raise HTTPException(status_code=400, detail=f"该应用下账号 {mapped_key} 已被使用")
- # Check Uniqueness for mapped_email (if changing or new, and provided)
- if mapped_email and (not mapping or mapping.mapped_email != mapped_email):
- email_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.mapped_email == mapped_email
- ).first()
- if email_exists:
- raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapped_email} 已被使用")
- new_mapping_created = False
- if mapping:
- # Update existing mapping
- mapping.mapped_key = mapped_key
- if sync_in.is_active is not None:
- mapping.is_active = sync_in.is_active
- if sync_in.mapped_email is not None:
- mapping.mapped_email = mapped_email
- else:
- # Create new mapping
- new_mapping_created = True
- mapping = AppUserMapping(
- app_id=current_app.id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=mapped_email,
- is_active=sync_in.is_active if sync_in.is_active is not None else True
- )
- db.add(mapping)
- db.commit()
- db.refresh(mapping)
- # LOGGING
- LogService.create_log(
- db=db,
- app_id=current_app.id,
- operator_id=current_app.owner_id,
- action_type=ActionType.SYNC_M2M,
- target_user_id=user.id,
- target_mobile=user.mobile,
- ip_address=get_client_ip(request),
- details={
- "mapped_key": mapped_key,
- "mapped_email": mapped_email,
- "new_user_created": new_user_created,
- "new_mapping_created": new_mapping_created,
- "sync_action": "UPSERT",
- "source": "M2M",
- "is_active": mapping.is_active,
- },
- )
-
- logger.info(f"M2M 同步成功: {sync_in.mobile} (Mapping: {mapping.id})")
- return MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=user.mobile,
- user_status=user.status,
- is_active=mapping.is_active
- )
- # ==========================================
- # Operation Logs
- # ==========================================
- @router.get("/{app_id}/logs", response_model=OperationLogList, summary="获取操作日志")
- def read_logs(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- skip: int = 0,
- limit: int = 20,
- action_type: ActionType = Query(None),
- keyword: str = Query(None, description="搜索手机号"),
- start_date: datetime = Query(None),
- end_date: datetime = Query(None),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取应用操作日志。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- total, logs = LogService.get_logs(
- db=db,
- app_id=app_id,
- skip=skip,
- limit=limit,
- action_type=action_type,
- keyword=keyword,
- start_date=start_date,
- end_date=end_date
- )
-
- result = []
- for log in logs:
- # Enrich operator mobile
- operator_mobile = log.operator.mobile if log.operator else "Unknown"
-
- result.append(OperationLogResponse(
- id=log.id,
- app_id=log.app_id,
- action_type=log.action_type,
- target_mobile=log.target_mobile,
- details=log.details,
- operator_id=log.operator_id,
- operator_mobile=operator_mobile,
- target_user_id=log.target_user_id,
- created_at=log.created_at
- ))
-
- return {"total": total, "items": result}
|