| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177 |
- import secrets
- import string
- import io
- import csv
- import pandas as pd
- from typing import List
- from datetime import datetime
- from fastapi import APIRouter, Depends, HTTPException, Response, UploadFile, File, Form, Query
- from fastapi.responses import StreamingResponse
- from sqlalchemy.orm import Session
- from sqlalchemy import desc
- from app.api.v1 import deps
- from app.core import security
- from app.models.application import Application
- from app.models.user import User
- from app.models.mapping import AppUserMapping
- from app.schemas.application import (
- ApplicationCreate,
- ApplicationUpdate,
- ApplicationResponse,
- ApplicationList,
- ApplicationSecretDisplay,
- ViewSecretRequest,
- RegenerateSecretRequest,
- ApplicationTransferRequest,
- AppSyncRequest
- )
- from app.schemas.mapping import (
- MappingList,
- MappingResponse,
- MappingCreate,
- MappingUpdate,
- MappingDelete,
- MappingPreviewResponse,
- MappingImportSummary,
- MappingStrategy,
- ImportLogResponse
- )
- from app.schemas.user import UserSyncRequest, UserSyncList
- from app.services.mapping_service import MappingService
- from app.services.sms_service import SmsService
- from app.services.log_service import LogService
- from app.schemas.operation_log import ActionType, OperationLogList, OperationLogResponse
- router = APIRouter()
- def generate_access_token():
- return secrets.token_urlsafe(32)
- def generate_app_credentials():
- # Generate a random 16-char App ID (hex or alphanumeric)
- app_id = "app_" + secrets.token_hex(8)
- # Generate a strong 32-char App Secret
- alphabet = string.ascii_letters + string.digits
- app_secret = ''.join(secrets.choice(alphabet) for i in range(32))
- return app_id, app_secret
- @router.get("/", response_model=ApplicationList, summary="获取应用列表")
- def read_apps(
- skip: int = 0,
- limit: int = 10,
- search: str = None,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取应用列表(分页)。
- 超级管理员可以查看所有,开发者只能查看自己的应用。
- """
- query = db.query(Application).filter(Application.is_deleted == False)
-
- if current_user.role != "SUPER_ADMIN":
- query = query.filter(Application.owner_id == current_user.id)
-
- if search:
- # Search by name or app_id
- from sqlalchemy import or_
- query = query.filter(
- or_(
- Application.app_name.ilike(f"%{search}%"),
- Application.app_id.ilike(f"%{search}%")
- )
- )
-
- total = query.count()
- apps = query.order_by(desc(Application.id)).offset(skip).limit(limit).all()
- return {"total": total, "items": apps}
- @router.get("/{app_id}", response_model=ApplicationResponse, summary="获取单个应用详情")
- def read_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取单个应用详情。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- return app
- @router.post("/", response_model=ApplicationSecretDisplay, summary="创建应用")
- def create_app(
- *,
- db: Session = Depends(deps.get_db),
- app_in: ApplicationCreate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 创建新应用。只会返回一次明文密钥。
- """
- # 1. Generate ID and Secret
- app_id, app_secret = generate_app_credentials()
- # 2. Generate Access Token
- access_token = generate_access_token()
-
- # 3. Store Secret (Plain text needed for HMAC verification)
-
- db_app = Application(
- app_id=app_id,
- app_secret=app_secret,
- access_token=access_token,
- app_name=app_in.app_name,
- icon_url=app_in.icon_url,
- protocol_type=app_in.protocol_type,
- redirect_uris=app_in.redirect_uris,
- notification_url=app_in.notification_url,
- owner_id=current_user.id # Assign owner
- )
-
- db.add(db_app)
- db.commit()
- db.refresh(db_app)
- return ApplicationSecretDisplay(app_id=app_id, app_secret=app_secret, access_token=access_token)
- @router.put("/{app_id}", response_model=ApplicationResponse, summary="更新应用")
- def update_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- app_in: ApplicationUpdate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 更新应用信息。需要手机验证码和密码验证。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Security Verification
- if not app_in.password or not app_in.verification_code:
- raise HTTPException(status_code=400, detail="需要提供密码和手机验证码")
- if not security.verify_password(app_in.password, current_user.password_hash):
- raise HTTPException(status_code=401, detail="密码错误")
- if not SmsService.verify_code(current_user.mobile, app_in.verification_code):
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
-
- update_data = app_in.model_dump(exclude_unset=True)
- # Remove security fields from update data
- update_data.pop('password', None)
- update_data.pop('verification_code', None)
- for field, value in update_data.items():
- setattr(app, field, value)
-
- db.add(app)
- db.commit()
- db.refresh(app)
- # 5. Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.UPDATE,
- details=update_data
- )
- return app
- @router.delete("/{app_id}", response_model=ApplicationResponse, summary="删除应用")
- def delete_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 软删除应用。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- app.is_deleted = True
- db.add(app)
- db.commit()
- return app
- @router.post("/{app_id}/regenerate-secret", response_model=ApplicationSecretDisplay, summary="重新生成密钥")
- def regenerate_secret(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- req: RegenerateSecretRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 重新生成应用密钥。需要手机验证码和密码验证。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- # Security Verification
- if not security.verify_password(req.password, current_user.password_hash):
- raise HTTPException(status_code=401, detail="密码错误")
- if not SmsService.verify_code(current_user.mobile, req.verification_code):
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
- _, new_secret = generate_app_credentials()
- app.app_secret = new_secret
-
- db.add(app)
- db.commit()
-
- # Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.REGENERATE_SECRET,
- details={"message": "Regenerated App Secret"}
- )
- return ApplicationSecretDisplay(app_id=app.app_id, app_secret=new_secret, access_token=app.access_token)
- @router.post("/{app_id}/view-secret", response_model=ApplicationSecretDisplay, summary="查看密钥")
- def view_secret(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- req: ViewSecretRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 查看应用密钥。需要验证用户密码。
- """
- # 1. Verify Password
- if not security.verify_password(req.password, current_user.password_hash):
- raise HTTPException(status_code=401, detail="密码错误")
-
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- # Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.VIEW_SECRET,
- details={"message": "Viewed App Secret"}
- )
- return ApplicationSecretDisplay(app_id=app.app_id, app_secret=app.app_secret, access_token=app.access_token)
- @router.post("/{app_id}/transfer", response_model=ApplicationResponse, summary="转让应用")
- def transfer_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- req: ApplicationTransferRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 将应用转让给其他开发者或超级管理员。
- 需要验证:目标用户手机号、当前用户密码、短信验证码。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # 1. Verify Password
- if not security.verify_password(req.password, current_user.password_hash):
- raise HTTPException(status_code=401, detail="密码错误")
- # 2. Verify SMS Code
- if not SmsService.verify_code(current_user.mobile, req.verification_code):
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
- # 3. Verify Target User
- target_user = db.query(User).filter(User.mobile == req.target_mobile, User.is_deleted == 0).first()
- if not target_user:
- raise HTTPException(status_code=404, detail="目标用户不存在")
-
- if target_user.status != "ACTIVE":
- raise HTTPException(status_code=400, detail="目标用户状态不正常")
-
- if target_user.role not in ["DEVELOPER", "SUPER_ADMIN"]:
- raise HTTPException(status_code=400, detail="目标用户必须是开发者或超级管理员")
-
- if target_user.id == app.owner_id:
- raise HTTPException(status_code=400, detail="应用已归属于该用户")
- # 4. Transfer
- old_owner_id = app.owner_id
- app.owner_id = target_user.id
-
- db.add(app)
- db.commit()
- db.refresh(app)
-
- # 5. Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.TRANSFER,
- target_user_id=target_user.id,
- target_mobile=target_user.mobile,
- details={
- "old_owner_id": old_owner_id,
- "new_owner_id": target_user.id
- }
- )
-
- return app
- # ==========================================
- # Mappings
- # ==========================================
- @router.get("/{app_id}/mappings", response_model=MappingList, summary="获取应用映射列表")
- def read_mappings(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- skip: int = 0,
- limit: int = 10,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取应用的账号映射列表。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- query = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id)
- total = query.count()
- mappings = query.order_by(desc(AppUserMapping.id)).offset(skip).limit(limit).all()
-
- # Enrich with user mobile (handled by ORM relation usually, but for Pydantic 'from_attributes')
- # We added `user_mobile` to MappingResponse, so we need to ensure it's populated.
- # The ORM `mapping.user` is lazy loaded, which is fine for sync code.
- result = []
- for m in mappings:
- result.append(MappingResponse(
- id=m.id,
- app_id=m.app_id,
- user_id=m.user_id,
- mapped_key=m.mapped_key,
- mapped_email=m.mapped_email,
- user_mobile=m.user.mobile if m.user else "Deleted User",
- user_status=m.user.status if m.user else "DELETED",
- is_active=m.is_active
- ))
-
- return {"total": total, "items": result}
- @router.post("/{app_id}/mappings", response_model=MappingResponse, summary="创建映射")
- def create_mapping(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- mapping_in: MappingCreate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 手动创建映射。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Verify Password
- if not security.verify_password(mapping_in.password, current_user.password_hash):
- raise HTTPException(status_code=401, detail="密码错误")
- # Normalize input: treat empty strings as None to avoid unique constraint violations
- mapped_key = mapping_in.mapped_key if mapping_in.mapped_key else None
- mapped_email = mapping_in.mapped_email if mapping_in.mapped_email else None
- # 1. Find User or Create
- user = db.query(User).filter(User.mobile == mapping_in.mobile, User.is_deleted == 0).first()
- new_user_created = False
- generated_password = None
- if not user:
- # Auto create user
- password_plain = security.generate_alphanumeric_password(8) # Random password letters+digits
- random_suffix = security.generate_alphanumeric_password(6)
- user = User(
- mobile=mapping_in.mobile,
- password_hash=security.get_password_hash(password_plain),
- status="ACTIVE",
- role="ORDINARY_USER",
- name=f"用户{random_suffix}",
- english_name=mapped_key
- )
- db.add(user)
- db.commit()
- db.refresh(user)
- new_user_created = True
- generated_password = password_plain
- # 2. Check if mapping exists
- existing = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.user_id == user.id
- ).first()
- if existing:
- raise HTTPException(status_code=400, detail="该用户的映射已存在")
- # 3. Check Uniqueness for mapped_email (if provided)
- if mapped_email:
- email_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_email == mapped_email
- ).first()
- if email_exists:
- raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapped_email} 已被使用")
- # 4. Check Uniqueness for mapped_key
- if mapped_key:
- key_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_key == mapped_key
- ).first()
- if key_exists:
- raise HTTPException(status_code=400, detail=f"该应用下账号 {mapped_key} 已被使用")
-
- # 5. Create
- mapping = AppUserMapping(
- app_id=app_id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=mapped_email
- )
- db.add(mapping)
- db.commit()
- db.refresh(mapping)
-
- # LOGGING
- LogService.create_log(
- db=db,
- app_id=app_id,
- operator_id=current_user.id,
- action_type=ActionType.MANUAL_ADD,
- target_user_id=user.id,
- target_mobile=user.mobile,
- details={
- "mapped_key": mapped_key,
- "mapped_email": mapped_email,
- "new_user_created": new_user_created
- }
- )
- return MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=user.mobile,
- user_status=user.status,
- new_user_created=new_user_created,
- generated_password=generated_password
- )
- @router.put("/{app_id}/mappings/{mapping_id}", response_model=MappingResponse, summary="更新映射")
- def update_mapping(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- mapping_id: int,
- mapping_in: MappingUpdate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 更新映射信息。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Verify Password
- if not security.verify_password(mapping_in.password, current_user.password_hash):
- raise HTTPException(status_code=401, detail="密码错误")
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.id == mapping_id,
- AppUserMapping.app_id == app_id
- ).first()
- if not mapping:
- raise HTTPException(status_code=404, detail="映射未找到")
- # Check Uniqueness for mapped_key
- if mapping_in.mapped_key is not None and mapping_in.mapped_key != mapping.mapped_key:
- if mapping_in.mapped_key:
- key_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_key == mapping_in.mapped_key
- ).first()
- if key_exists:
- raise HTTPException(status_code=400, detail=f"该应用下账号 {mapping_in.mapped_key} 已被使用")
- # Check Uniqueness for mapped_email
- if mapping_in.mapped_email is not None and mapping_in.mapped_email != mapping.mapped_email:
- if mapping_in.mapped_email:
- email_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_email == mapping_in.mapped_email
- ).first()
- if email_exists:
- raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapping_in.mapped_email} 已被使用")
- # Capture old values for logging
- old_key = mapping.mapped_key
- old_email = mapping.mapped_email
- if mapping_in.mapped_key is not None:
- mapping.mapped_key = mapping_in.mapped_key
- if mapping_in.mapped_email is not None:
- mapping.mapped_email = mapping_in.mapped_email
- db.add(mapping)
- db.commit()
- db.refresh(mapping)
- # LOGGING
- LogService.create_log(
- db=db,
- app_id=app_id,
- operator_id=current_user.id,
- action_type=ActionType.UPDATE,
- target_user_id=mapping.user_id,
- target_mobile=mapping.user.mobile if mapping.user else None,
- details={
- "old": {"mapped_key": old_key, "mapped_email": old_email},
- "new": {"mapped_key": mapping.mapped_key, "mapped_email": mapping.mapped_email}
- }
- )
- return MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=mapping.user.mobile if mapping.user else "Deleted User",
- user_status=mapping.user.status if mapping.user else "DELETED",
- is_active=mapping.is_active
- )
- @router.delete("/{app_id}/mappings/{mapping_id}", summary="删除映射")
- def delete_mapping(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- mapping_id: int,
- req: MappingDelete,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 删除映射关系。需验证密码。
- """
- # Verify Password
- if not security.verify_password(req.password, current_user.password_hash):
- raise HTTPException(status_code=401, detail="密码错误")
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.id == mapping_id,
- AppUserMapping.app_id == app_id
- ).first()
- if not mapping:
- raise HTTPException(status_code=404, detail="映射未找到")
-
- app = db.query(Application).filter(Application.id == app_id).first()
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Capture for logging
- target_user_id = mapping.user_id
- target_mobile = mapping.user.mobile if mapping.user else None
-
- db.delete(mapping)
- db.commit()
-
- # LOGGING
- LogService.create_log(
- db=db,
- app_id=app_id,
- operator_id=current_user.id,
- action_type=ActionType.DELETE,
- target_user_id=target_user_id,
- target_mobile=target_mobile,
- details={"mapping_id": mapping_id}
- )
-
- return {"message": "删除成功"}
- @router.post("/{app_id}/sync-users", summary="同步所有用户")
- def sync_users_to_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 一键导入用户管理中的用户数据到应用映射中。
- 规则:如果用户已在映射中(基于手机号/User ID),则跳过。
- 否则创建映射,使用英文名称作为映射账号(如果为空则使用手机号)。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Get all active users
- users = db.query(User).filter(User.is_deleted == 0).all()
-
- # Get existing mappings (user_ids)
- existing_mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all()
- mapped_user_ids = {m.user_id for m in existing_mappings}
-
- new_mappings = []
-
- for user in users:
- if user.id in mapped_user_ids:
- continue
-
- # Create mapping
- # Rule: Use English name as mapped_key. Fallback to mobile.
- mapped_key = user.english_name if user.english_name else user.mobile
-
- # Check if mapped_key is already used in this app (unlikely for User ID check passed, but mapped_key might collide if english names are dupes)
- # However, bulk insert for thousands might be slow if we check one by one.
- # Ideally we should fetch all existing mapped_keys too.
-
- # For simplicity in "one-click sync", we might just proceed.
- # But if mapped_key is not unique in AppUserMapping (uq_app_mapped_key), it will fail.
- # Let's assume English Name is unique enough or we catch error?
- # Actually, let's verify if english_name is unique in User table. It's not (User.english_name is nullable and not unique).
- # So multiple users might have same english_name.
- # If so, we should probably append mobile or something to make it unique?
- # Or just use mobile if english_name is duplicate?
-
- # Re-reading: "没有存在则使用手机号码,英文名称作为账号映射" -> "If not exists, use phone number, English name as account mapping".
- # This could mean: Use English Name.
-
- mapping = AppUserMapping(
- app_id=app.id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=None,
- is_active=True
- )
- new_mappings.append(mapping)
- if new_mappings:
- try:
- db.bulk_save_objects(new_mappings)
- db.commit()
- except Exception as e:
- db.rollback()
- # If bulk fails (e.g. unique constraint on mapped_key), we might need to do row-by-row or handle it.
- # Fallback: try one by one
- success_count = 0
- for m in new_mappings:
- try:
- db.add(m)
- db.commit()
- success_count += 1
- except:
- db.rollback()
-
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.IMPORT,
- details={"message": "Sync all users (partial)", "attempted": len(new_mappings), "success": success_count}
- )
- return {"message": f"同步完成,成功 {success_count} 个,失败 {len(new_mappings) - success_count} 个 (可能是账号冲突)"}
-
- # Log success
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.IMPORT,
- details={"message": "Sync all users", "count": len(new_mappings)}
- )
- return {"message": f"同步成功,新增 {len(new_mappings)} 个用户映射"}
- return {"message": "没有需要同步的用户"}
- @router.post("/{app_id}/sync-users-v2", summary="同步用户 (新版)")
- def sync_users_to_app_v2(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- sync_req: AppSyncRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 高级用户同步功能。
- 支持全量/部分同步,以及可选的默认邮箱初始化。
- 需要手机验证码。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # 1. Verify SMS Code
- if not SmsService.verify_code(current_user.mobile, sync_req.verification_code):
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
- # 2. Determine Target Users
- query = db.query(User).filter(User.is_deleted == 0)
-
- if sync_req.mode == "SELECTED":
- if not sync_req.user_ids:
- raise HTTPException(status_code=400, detail="请选择要同步的用户")
- query = query.filter(User.id.in_(sync_req.user_ids))
-
- users = query.all()
-
- if not users:
- return {"message": "没有找到可同步的用户"}
- # 3. Get existing mappings (user_ids) to skip
- existing_mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all()
- mapped_user_ids = {m.user_id for m in existing_mappings}
-
- # Check if email domain is valid format if provided (simple check)
- if sync_req.init_email and not sync_req.email_domain:
- raise HTTPException(status_code=400, detail="开启邮箱初始化时必须填写域名")
- new_mappings = []
-
- for user in users:
- if user.id in mapped_user_ids:
- continue
-
- # Determine mapped_key (English name)
- # If english_name is missing, fallback to mobile? Or skip?
- # User requirement: "账号就是英文名" (Account is English name)
- # Assuming if english_name is empty, we might use mobile or some generation.
- # Let's fallback to mobile if english_name is missing, but prefer english_name.
- mapped_key = user.english_name if user.english_name else user.mobile
-
- mapped_email = None
- if sync_req.init_email and user.english_name:
- # Construct email
- domain = sync_req.email_domain.strip()
- if not domain.startswith("@"):
- domain = "@" + domain
- mapped_email = f"{user.english_name}{domain}"
-
- mapping = AppUserMapping(
- app_id=app.id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=mapped_email,
- is_active=True
- )
- new_mappings.append(mapping)
- if not new_mappings:
- return {"message": "所有选中的用户均已存在映射,无需同步"}
- # 4. Insert
- # We'll try to insert one by one to handle potential unique constraint violations (e.g. same english name for different users)
- # Or we can try bulk and catch. Given "User can check selection", maybe best effort is good.
-
- success_count = 0
- fail_count = 0
-
- for m in new_mappings:
- try:
- # Check unique key collision within this transaction if possible,
- # but db.add + commit per row is safer for partial success report.
-
- # Additional check: uniqueness of mapped_key in this app
- # (We already have uniqueness constraint in DB)
- db.add(m)
- db.commit()
- success_count += 1
- except Exception:
- db.rollback()
- fail_count += 1
-
- # 5. Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.SYNC,
- details={
- "mode": sync_req.mode,
- "init_email": sync_req.init_email,
- "total_attempted": len(new_mappings),
- "success": success_count,
- "failed": fail_count
- }
- )
-
- msg = f"同步完成。成功: {success_count},失败: {fail_count}"
- if fail_count > 0:
- msg += " (失败原因可能是账号或邮箱冲突)"
-
- return {"message": msg}
- @router.get("/{app_id}/mappings/export", summary="导出映射")
- def export_mappings(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 导出所有映射到 Excel (.xlsx)。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all()
-
- # Prepare data for DataFrame
- data = []
- for m in mappings:
- mobile = m.user.mobile if m.user else "Deleted User"
- data.append({
- '手机号': mobile,
- '映射账号': m.mapped_key,
- '映射邮箱': m.mapped_email or ''
- })
-
- # Create DataFrame
- df = pd.DataFrame(data)
-
- # If no data, create an empty DataFrame with columns
- if not data:
- df = pd.DataFrame(columns=['手机号', '映射账号', '映射邮箱'])
- # Write to Excel BytesIO
- output = io.BytesIO()
- with pd.ExcelWriter(output, engine='openpyxl') as writer:
- df.to_excel(writer, index=False)
-
- output.seek(0)
-
- filename = f"mappings_app_{app_id}.xlsx"
-
- return StreamingResponse(
- output,
- media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- headers={"Content-Disposition": f"attachment; filename={filename}"}
- )
- @router.post("/{app_id}/mapping/preview", response_model=MappingPreviewResponse, summary="预览映射导入")
- async def preview_mapping(
- app_id: int,
- file: UploadFile = File(...),
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 预览 Excel/CSV 映射导入。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- contents = await file.read()
- filename = file.filename
- return MappingService.preview_import(db, app_id, contents, filename)
- @router.post("/send-import-verification-code", summary="发送导入验证码")
- def send_import_verification_code(
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 发送验证码给当前登录用户(用于敏感操作验证,如导入)。
- """
- SmsService.send_code(current_user.mobile)
- return {"message": "验证码已发送"}
- @router.post("/{app_id}/mapping/import", response_model=ImportLogResponse, summary="执行映射导入")
- async def import_mapping(
- app_id: int,
- file: UploadFile = File(...),
- strategy: MappingStrategy = Form(MappingStrategy.SKIP),
- verification_code: str = Form(...),
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 执行映射导入操作。需要验证短信验证码。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- contents = await file.read()
- filename = file.filename
-
- result = MappingService.execute_import(db, app_id, contents, filename, strategy, current_user.mobile, verification_code)
-
- # LOGGING
- # For import, we log the summary and the logs structure
- LogService.create_log(
- db=db,
- app_id=app_id,
- operator_id=current_user.id,
- action_type=ActionType.IMPORT,
- details=result.model_dump(mode='json') # Store full result including logs
- )
-
- return result
- @router.get("/mapping/users", response_model=UserSyncList, summary="获取全量用户(M2M)")
- def get_all_users_m2m(
- *,
- db: Session = Depends(deps.get_db),
- skip: int = 0,
- limit: int = 100,
- current_app: Application = Depends(deps.get_current_app),
- ):
- """
- 开发者拉取全量用户接口。
- 仅返回:手机号、姓名、英文名。
- 需要应用访问令牌 (Authorization Bearer JWT 或 X-App-Access-Token)。
- """
- query = db.query(User).filter(User.is_deleted == 0)
-
- total = query.count()
- users = query.order_by(User.id).offset(skip).limit(limit).all()
-
- return {"total": total, "items": users}
- @router.post("/mapping/sync", response_model=MappingResponse, summary="同步映射 (M2M)")
- def sync_mapping(
- *,
- db: Session = Depends(deps.get_db),
- sync_in: UserSyncRequest,
- current_app: Application = Depends(deps.get_current_app),
- ):
- """
- 从外部平台同步用户映射关系(机器对机器)。
- 只同步映射关系,不创建或更新用户本身。
- 需要应用访问令牌 (Authorization Bearer JWT 或 X-App-Access-Token)。
- """
- # Normalize input: treat empty strings as None
- mapped_key = sync_in.mapped_key if sync_in.mapped_key else None
- mapped_email = sync_in.mapped_email if sync_in.mapped_email else None
- # 1. Find User or Create
- user = db.query(User).filter(User.mobile == sync_in.mobile).first()
- new_user_created = False
- if not user:
- # Auto create user
- password = security.generate_alphanumeric_password(8) # Random password letters+digits
- random_suffix = security.generate_alphanumeric_password(6)
- user = User(
- mobile=sync_in.mobile,
- password_hash=security.get_password_hash(password),
- status="ACTIVE",
- role="ORDINARY_USER",
- name=f"用户{random_suffix}",
- english_name=mapped_key
- )
- db.add(user)
- db.commit()
- db.refresh(user)
- new_user_created = True
- # 2. Handle Mapping
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.user_id == user.id
- ).first()
- # Check Uniqueness for mapped_key (if changing or new, and provided)
- if mapped_key and (not mapping or mapping.mapped_key != mapped_key):
- key_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.mapped_key == mapped_key
- ).first()
- if key_exists:
- raise HTTPException(status_code=400, detail=f"该应用下账号 {mapped_key} 已被使用")
- # Check Uniqueness for mapped_email (if changing or new, and provided)
- if mapped_email and (not mapping or mapping.mapped_email != mapped_email):
- email_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.mapped_email == mapped_email
- ).first()
- if email_exists:
- raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapped_email} 已被使用")
- new_mapping_created = False
- if mapping:
- # Update existing mapping
- if sync_in.mapped_key is not None:
- mapping.mapped_key = mapped_key
- if sync_in.is_active is not None:
- mapping.is_active = sync_in.is_active
- if sync_in.mapped_email is not None:
- mapping.mapped_email = mapped_email
- else:
- # Create new mapping
- new_mapping_created = True
- mapping = AppUserMapping(
- app_id=current_app.id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=mapped_email,
- is_active=sync_in.is_active if sync_in.is_active is not None else True
- )
- db.add(mapping)
- db.commit()
- db.refresh(mapping)
- # LOGGING
- LogService.create_log(
- db=db,
- app_id=current_app.id,
- operator_id=current_app.owner_id,
- action_type=ActionType.SYNC_M2M,
- target_user_id=user.id,
- target_mobile=user.mobile,
- details={
- "mapped_key": mapped_key,
- "mapped_email": mapped_email,
- "new_user_created": new_user_created,
- "new_mapping_created": new_mapping_created
- }
- )
- return MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=user.mobile,
- user_status=user.status,
- is_active=mapping.is_active
- )
- # ==========================================
- # Operation Logs
- # ==========================================
- @router.get("/{app_id}/logs", response_model=OperationLogList, summary="获取操作日志")
- def read_logs(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- skip: int = 0,
- limit: int = 20,
- action_type: ActionType = Query(None),
- keyword: str = Query(None, description="搜索手机号"),
- start_date: datetime = Query(None),
- end_date: datetime = Query(None),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取应用操作日志。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- total, logs = LogService.get_logs(
- db=db,
- app_id=app_id,
- skip=skip,
- limit=limit,
- action_type=action_type,
- keyword=keyword,
- start_date=start_date,
- end_date=end_date
- )
-
- result = []
- for log in logs:
- # Enrich operator mobile
- operator_mobile = log.operator.mobile if log.operator else "Unknown"
-
- result.append(OperationLogResponse(
- id=log.id,
- app_id=log.app_id,
- action_type=log.action_type,
- target_mobile=log.target_mobile,
- details=log.details,
- operator_id=log.operator_id,
- operator_mobile=operator_mobile,
- target_user_id=log.target_user_id,
- created_at=log.created_at
- ))
-
- return {"total": total, "items": result}
|