| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307 |
- import secrets
- import string
- import io
- import csv
- import pandas as pd
- import logging
- from typing import List
- from datetime import datetime
- from fastapi import APIRouter, Depends, HTTPException, Response, UploadFile, File, Form, Query
- from fastapi.responses import StreamingResponse
- from sqlalchemy.orm import Session
- from sqlalchemy import desc, or_
- from app.api.v1 import deps
- from app.core import security
- from app.models.application import Application
- from app.models.user import User
- from app.models.mapping import AppUserMapping
- from app.core.utils import generate_english_name
- from app.schemas.application import (
- ApplicationCreate,
- ApplicationUpdate,
- ApplicationResponse,
- ApplicationList,
- ApplicationSecretDisplay,
- ViewSecretRequest,
- RegenerateSecretRequest,
- ApplicationTransferRequest,
- AppSyncRequest
- )
- from app.schemas.mapping import (
- MappingList,
- MappingResponse,
- MappingCreate,
- MappingUpdate,
- MappingDelete,
- MappingPreviewResponse,
- MappingImportSummary,
- MappingStrategy,
- ImportLogResponse
- )
- from app.schemas.user import UserSyncRequest, UserSyncList
- from app.services.mapping_service import MappingService
- from app.services.sms_service import SmsService
- from app.services.log_service import LogService
- from app.schemas.operation_log import ActionType, OperationLogList, OperationLogResponse
- router = APIRouter()
- logger = logging.getLogger(__name__)
- def generate_access_token():
- return secrets.token_urlsafe(32)
- def generate_app_credentials():
- # Generate a random 16-char App ID (hex or alphanumeric)
- app_id = "app_" + secrets.token_hex(8)
- # Generate a strong 32-char App Secret
- alphabet = string.ascii_letters + string.digits
- app_secret = ''.join(secrets.choice(alphabet) for i in range(32))
- return app_id, app_secret
- @router.get("/", response_model=ApplicationList, summary="获取应用列表")
- def read_apps(
- skip: int = 0,
- limit: int = 10,
- search: str = None,
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取应用列表(分页)。
- 超级管理员可以查看所有,开发者只能查看自己的应用。
- """
- query = db.query(Application).filter(Application.is_deleted == False)
-
- if current_user.role != "SUPER_ADMIN":
- query = query.filter(Application.owner_id == current_user.id)
-
- if search:
- # Search by name or app_id
- query = query.filter(
- or_(
- Application.app_name.ilike(f"%{search}%"),
- Application.app_id.ilike(f"%{search}%")
- )
- )
-
- total = query.count()
- apps = query.order_by(desc(Application.id)).offset(skip).limit(limit).all()
- return {"total": total, "items": apps}
- @router.get("/{app_id}", response_model=ApplicationResponse, summary="获取单个应用详情")
- def read_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取单个应用详情。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- return app
- @router.post("/", response_model=ApplicationSecretDisplay, summary="创建应用")
- def create_app(
- *,
- db: Session = Depends(deps.get_db),
- app_in: ApplicationCreate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 创建新应用。只会返回一次明文密钥。
- """
- # 1. Generate ID and Secret
- app_id, app_secret = generate_app_credentials()
- # 2. Generate Access Token
- access_token = generate_access_token()
-
- # 3. Store Secret (Plain text needed for HMAC verification)
-
- db_app = Application(
- app_id=app_id,
- app_secret=app_secret,
- access_token=access_token,
- app_name=app_in.app_name,
- icon_url=app_in.icon_url,
- protocol_type=app_in.protocol_type,
- redirect_uris=app_in.redirect_uris,
- notification_url=app_in.notification_url,
- owner_id=current_user.id # Assign owner
- )
-
- db.add(db_app)
- db.commit()
- db.refresh(db_app)
- logger.info(f"应用创建成功: {app_in.app_name} (ID: {app_id}, Owner: {current_user.mobile})")
-
- return ApplicationSecretDisplay(app_id=app_id, app_secret=app_secret, access_token=access_token)
- @router.put("/{app_id}", response_model=ApplicationResponse, summary="更新应用")
- def update_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- app_in: ApplicationUpdate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 更新应用信息。需要手机验证码和密码验证。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Security Verification
- if not app_in.verification_code:
- raise HTTPException(status_code=400, detail="需要提供手机验证码")
- if not SmsService.verify_code(current_user.mobile, app_in.verification_code):
- logger.warning(f"应用更新失败: 验证码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
-
- update_data = app_in.model_dump(exclude_unset=True)
- # Remove security fields from update data
- update_data.pop('password', None)
- update_data.pop('verification_code', None)
- for field, value in update_data.items():
- setattr(app, field, value)
-
- db.add(app)
- db.commit()
- db.refresh(app)
- # 5. Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.UPDATE,
- details=update_data
- )
-
- logger.info(f"应用更新成功: {app.app_name} (ID: {app.app_id})")
- return app
- @router.delete("/{app_id}", response_model=ApplicationResponse, summary="删除应用")
- def delete_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 软删除应用。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- app.is_deleted = True
- db.add(app)
- db.commit()
-
- logger.info(f"应用删除成功: {app.app_name} (ID: {app.app_id}, Operator: {current_user.mobile})")
- return app
- @router.post("/{app_id}/regenerate-secret", response_model=ApplicationSecretDisplay, summary="重新生成密钥")
- def regenerate_secret(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- req: RegenerateSecretRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 重新生成应用密钥。需要手机验证码和密码验证。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- # Security Verification
- if not security.verify_password(req.password, current_user.password_hash):
- logger.warning(f"重置密钥失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
- if not SmsService.verify_code(current_user.mobile, req.verification_code):
- logger.warning(f"重置密钥失败: 验证码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
- _, new_secret = generate_app_credentials()
- app.app_secret = new_secret
-
- db.add(app)
- db.commit()
-
- # Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.REGENERATE_SECRET,
- details={"message": "Regenerated App Secret"}
- )
- logger.info(f"应用密钥已重置: {app.app_name} (ID: {app.app_id})")
- return ApplicationSecretDisplay(app_id=app.app_id, app_secret=new_secret, access_token=app.access_token)
- @router.post("/{app_id}/view-secret", response_model=ApplicationSecretDisplay, summary="查看密钥")
- def view_secret(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- req: ViewSecretRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 查看应用密钥。需要验证用户密码。
- """
- # 1. Verify Password
- if not security.verify_password(req.password, current_user.password_hash):
- logger.warning(f"查看密钥失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
-
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
-
- # Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.VIEW_SECRET,
- details={"message": "Viewed App Secret"}
- )
-
- logger.info(f"查看应用密钥: {app.app_name} (Operator: {current_user.mobile})")
- return ApplicationSecretDisplay(app_id=app.app_id, app_secret=app.app_secret, access_token=app.access_token)
- @router.post("/{app_id}/transfer", response_model=ApplicationResponse, summary="转让应用")
- def transfer_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- req: ApplicationTransferRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 将应用转让给其他开发者或超级管理员。
- 需要验证:目标用户手机号、当前用户密码、短信验证码。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
- # Check ownership
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # 1. Verify Password
- if not security.verify_password(req.password, current_user.password_hash):
- logger.warning(f"转让应用失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
- # 2. Verify SMS Code
- if not SmsService.verify_code(current_user.mobile, req.verification_code):
- logger.warning(f"转让应用失败: 验证码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
- # 3. Verify Target User
- target_user = db.query(User).filter(User.mobile == req.target_mobile, User.is_deleted == 0).first()
- if not target_user:
- raise HTTPException(status_code=404, detail="目标用户不存在")
-
- if target_user.status != "ACTIVE":
- raise HTTPException(status_code=400, detail="目标用户状态不正常")
-
- if target_user.role not in ["DEVELOPER", "SUPER_ADMIN"]:
- raise HTTPException(status_code=400, detail="目标用户必须是开发者或超级管理员")
-
- if target_user.id == app.owner_id:
- raise HTTPException(status_code=400, detail="应用已归属于该用户")
- # 4. Transfer
- old_owner_id = app.owner_id
- app.owner_id = target_user.id
-
- db.add(app)
- db.commit()
- db.refresh(app)
-
- # 5. Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.TRANSFER,
- target_user_id=target_user.id,
- target_mobile=target_user.mobile,
- details={
- "old_owner_id": old_owner_id,
- "new_owner_id": target_user.id
- }
- )
-
- logger.info(f"应用转让成功: {app.app_name} 从 {current_user.mobile} 转让给 {target_user.mobile}")
-
- return app
- # ==========================================
- # Mappings
- # ==========================================
- @router.get("/{app_id}/mappings", response_model=MappingList, summary="获取应用映射列表")
- def read_mappings(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- skip: int = 0,
- limit: int = 10,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取应用的账号映射列表。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- query = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id)
- total = query.count()
- mappings = query.order_by(desc(AppUserMapping.id)).offset(skip).limit(limit).all()
-
- # Enrich with user mobile (handled by ORM relation usually, but for Pydantic 'from_attributes')
- # We added `user_mobile` to MappingResponse, so we need to ensure it's populated.
- # The ORM `mapping.user` is lazy loaded, which is fine for sync code.
- result = []
- for m in mappings:
- result.append(MappingResponse(
- id=m.id,
- app_id=m.app_id,
- user_id=m.user_id,
- mapped_key=m.mapped_key,
- mapped_email=m.mapped_email,
- user_mobile=m.user.mobile if m.user else "Deleted User",
- user_status=m.user.status if m.user else "DELETED",
- is_active=m.is_active
- ))
-
- return {"total": total, "items": result}
- @router.post("/{app_id}/mappings", response_model=MappingResponse, summary="创建映射")
- def create_mapping(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- mapping_in: MappingCreate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 手动创建映射。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Verify Password
- if not security.verify_password(mapping_in.password, current_user.password_hash):
- logger.warning(f"创建映射失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
- # Normalize input: treat empty strings as None to avoid unique constraint violations
- mapped_key = mapping_in.mapped_key if mapping_in.mapped_key else None
- mapped_email = mapping_in.mapped_email if mapping_in.mapped_email else None
- # 1. Find User or Create
- user = db.query(User).filter(User.mobile == mapping_in.mobile, User.is_deleted == 0).first()
- new_user_created = False
- generated_password = None
- if not user:
- # Auto create user
- password_plain = security.generate_alphanumeric_password(8) # Random password letters+digits
- random_suffix = security.generate_alphanumeric_password(6)
- user = User(
- mobile=mapping_in.mobile,
- password_hash=security.get_password_hash(password_plain),
- status="ACTIVE",
- role="ORDINARY_USER",
- name=f"用户{random_suffix}",
- english_name=mapped_key
- )
- db.add(user)
- db.commit()
- db.refresh(user)
- new_user_created = True
- generated_password = password_plain
- logger.info(f"自动创建用户: {user.mobile}")
- # 2. Check if mapping exists
- existing = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.user_id == user.id
- ).first()
- if existing:
- raise HTTPException(status_code=400, detail="该用户的映射已存在")
- # 3. Check Uniqueness for mapped_email (if provided)
- if mapped_email:
- email_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_email == mapped_email
- ).first()
- if email_exists:
- raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapped_email} 已被使用")
- # 4. Check Uniqueness for mapped_key
- if mapped_key:
- key_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_key == mapped_key
- ).first()
- if key_exists:
- raise HTTPException(status_code=400, detail=f"该应用下账号 {mapped_key} 已被使用")
-
- # 5. Create
- mapping = AppUserMapping(
- app_id=app_id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=mapped_email
- )
- db.add(mapping)
- db.commit()
- db.refresh(mapping)
-
- # LOGGING
- LogService.create_log(
- db=db,
- app_id=app_id,
- operator_id=current_user.id,
- action_type=ActionType.MANUAL_ADD,
- target_user_id=user.id,
- target_mobile=user.mobile,
- details={
- "mapped_key": mapped_key,
- "mapped_email": mapped_email,
- "new_user_created": new_user_created
- }
- )
-
- logger.info(f"映射创建成功: App {app_id} -> User {user.mobile} ({mapped_key})")
- return MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=user.mobile,
- user_status=user.status,
- new_user_created=new_user_created,
- generated_password=generated_password
- )
- @router.put("/{app_id}/mappings/{mapping_id}", response_model=MappingResponse, summary="更新映射")
- def update_mapping(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- mapping_id: int,
- mapping_in: MappingUpdate,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 更新映射信息。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Verify Password
- if not security.verify_password(mapping_in.password, current_user.password_hash):
- logger.warning(f"更新映射失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.id == mapping_id,
- AppUserMapping.app_id == app_id
- ).first()
- if not mapping:
- raise HTTPException(status_code=404, detail="映射未找到")
- # Check Uniqueness for mapped_key
- if mapping_in.mapped_key is not None and mapping_in.mapped_key != mapping.mapped_key:
- if mapping_in.mapped_key:
- key_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_key == mapping_in.mapped_key
- ).first()
- if key_exists:
- raise HTTPException(status_code=400, detail=f"该应用下账号 {mapping_in.mapped_key} 已被使用")
- # Check Uniqueness for mapped_email
- if mapping_in.mapped_email is not None and mapping_in.mapped_email != mapping.mapped_email:
- if mapping_in.mapped_email:
- email_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.mapped_email == mapping_in.mapped_email
- ).first()
- if email_exists:
- raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapping_in.mapped_email} 已被使用")
- # Capture old values for logging
- old_key = mapping.mapped_key
- old_email = mapping.mapped_email
- if mapping_in.mapped_key is not None:
- mapping.mapped_key = mapping_in.mapped_key
- if mapping_in.mapped_email is not None:
- mapping.mapped_email = mapping_in.mapped_email
- db.add(mapping)
- db.commit()
- db.refresh(mapping)
- # LOGGING
- LogService.create_log(
- db=db,
- app_id=app_id,
- operator_id=current_user.id,
- action_type=ActionType.UPDATE,
- target_user_id=mapping.user_id,
- target_mobile=mapping.user.mobile if mapping.user else None,
- details={
- "old": {"mapped_key": old_key, "mapped_email": old_email},
- "new": {"mapped_key": mapping.mapped_key, "mapped_email": mapping.mapped_email}
- }
- )
-
- logger.info(f"映射更新成功: App {app_id} User {mapping.user.mobile if mapping.user else 'unknown'}")
- return MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=mapping.user.mobile if mapping.user else "Deleted User",
- user_status=mapping.user.status if mapping.user else "DELETED",
- is_active=mapping.is_active
- )
- @router.delete("/{app_id}/mappings/{mapping_id}", summary="删除映射")
- def delete_mapping(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- mapping_id: int,
- req: MappingDelete,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 删除映射关系。需验证密码。
- """
- # Verify Password
- if not security.verify_password(req.password, current_user.password_hash):
- logger.warning(f"删除映射失败: 密码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=403, detail="密码错误")
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.id == mapping_id,
- AppUserMapping.app_id == app_id
- ).first()
- if not mapping:
- raise HTTPException(status_code=404, detail="映射未找到")
-
- app = db.query(Application).filter(Application.id == app_id).first()
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Capture for logging
- target_user_id = mapping.user_id
- target_mobile = mapping.user.mobile if mapping.user else None
-
- db.delete(mapping)
- db.commit()
-
- # LOGGING
- LogService.create_log(
- db=db,
- app_id=app_id,
- operator_id=current_user.id,
- action_type=ActionType.DELETE,
- target_user_id=target_user_id,
- target_mobile=target_mobile,
- details={"mapping_id": mapping_id}
- )
-
- logger.info(f"映射删除成功: App {app_id}, Mapping {mapping_id}")
-
- return {"message": "删除成功"}
- @router.post("/{app_id}/sync-users", summary="同步所有用户")
- def sync_users_to_app(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 一键导入用户管理中的用户数据到应用映射中。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # Get all active users
- users = db.query(User).filter(User.is_deleted == 0).all()
-
- # Get existing mappings (user_ids)
- existing_mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all()
- mapped_user_ids = {m.user_id for m in existing_mappings}
-
- new_mappings = []
- logger.info(f"开始同步用户到应用 {app.app_name} (ID: {app_id})")
-
- for user in users:
- if user.id in mapped_user_ids:
- continue
-
- # Create mapping
- mapped_key = user.english_name if user.english_name else user.mobile
-
- mapping = AppUserMapping(
- app_id=app.id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=None,
- is_active=True
- )
- new_mappings.append(mapping)
- if new_mappings:
- try:
- db.bulk_save_objects(new_mappings)
- db.commit()
- logger.info(f"用户同步完成: 新增 {len(new_mappings)} 条映射")
- except Exception as e:
- db.rollback()
- logger.error(f"批量同步用户失败: {e}。尝试逐条插入。")
-
- # Fallback: try one by one
- success_count = 0
- for m in new_mappings:
- try:
- db.add(m)
- db.commit()
- success_count += 1
- except Exception as ex:
- db.rollback()
- logger.warning(f"单个用户映射失败 (User: {m.user_id}): {ex}")
-
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.IMPORT,
- details={"message": "Sync all users (partial)", "attempted": len(new_mappings), "success": success_count}
- )
- return {"message": f"同步完成,成功 {success_count} 个,失败 {len(new_mappings) - success_count} 个 (可能是账号冲突)"}
-
- # Log success
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.IMPORT,
- details={"message": "Sync all users", "count": len(new_mappings)}
- )
- return {"message": f"同步成功,新增 {len(new_mappings)} 个用户映射"}
- logger.info("用户同步: 没有需要同步的新用户")
- return {"message": "没有需要同步的用户"}
- @router.post("/{app_id}/sync-users-v2", summary="同步用户 (新版)")
- def sync_users_to_app_v2(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- sync_req: AppSyncRequest,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 高级用户同步功能。
- 支持全量/部分同步,以及可选的默认邮箱初始化。
- 需要手机验证码。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- # 1. Verify SMS Code
- if not SmsService.verify_code(current_user.mobile, sync_req.verification_code):
- logger.warning(f"同步用户失败: 验证码错误 (User: {current_user.mobile})")
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
- # 2. Determine Target Users
- query = db.query(User).filter(User.is_deleted == 0)
-
- if sync_req.mode == "SELECTED":
- if not sync_req.user_ids:
- raise HTTPException(status_code=400, detail="请选择要同步的用户")
- query = query.filter(User.id.in_(sync_req.user_ids))
-
- users = query.all()
-
- if not users:
- return {"message": "没有找到可同步的用户"}
- # 3. Get existing mappings (user_ids) to skip
- existing_mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all()
- mapped_user_ids = {m.user_id for m in existing_mappings}
-
- # Check if email domain is valid format if provided (simple check)
- if sync_req.init_email and not sync_req.email_domain:
- raise HTTPException(status_code=400, detail="开启邮箱初始化时必须填写域名")
- new_mappings = []
-
- for user in users:
- if user.id in mapped_user_ids:
- continue
-
- mapped_key = user.english_name if user.english_name else user.mobile
-
- mapped_email = None
- if sync_req.init_email and user.english_name:
- # Construct email
- domain = sync_req.email_domain.strip()
- if not domain.startswith("@"):
- domain = "@" + domain
- mapped_email = f"{user.english_name}{domain}"
-
- mapping = AppUserMapping(
- app_id=app.id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=mapped_email,
- is_active=True
- )
- new_mappings.append(mapping)
- if not new_mappings:
- return {"message": "所有选中的用户均已存在映射,无需同步"}
- # 4. Insert
- logger.info(f"开始同步(v2)用户到应用 {app.app_name},计划新增 {len(new_mappings)} 条")
-
- success_count = 0
- fail_count = 0
-
- for m in new_mappings:
- try:
- # Additional check: uniqueness of mapped_key in this app
- db.add(m)
- db.commit()
- success_count += 1
- except Exception as e:
- db.rollback()
- fail_count += 1
- logger.warning(f"同步单个映射失败 (User: {m.user_id}): {e}")
-
- # 5. Log
- LogService.create_log(
- db=db,
- app_id=app.id,
- operator_id=current_user.id,
- action_type=ActionType.SYNC,
- details={
- "mode": sync_req.mode,
- "init_email": sync_req.init_email,
- "total_attempted": len(new_mappings),
- "success": success_count,
- "failed": fail_count
- }
- )
-
- logger.info(f"同步(v2)完成。成功: {success_count}, 失败: {fail_count}")
- msg = f"同步完成。成功: {success_count},失败: {fail_count}"
- if fail_count > 0:
- msg += " (失败原因可能是账号或邮箱冲突)"
-
- return {"message": msg}
- @router.get("/{app_id}/mappings/export", summary="导出映射")
- def export_mappings(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 导出所有映射到 Excel (.xlsx)。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all()
-
- # Prepare data for DataFrame
- data = []
- for m in mappings:
- mobile = m.user.mobile if m.user else "Deleted User"
- data.append({
- '手机号': mobile,
- '映射账号': m.mapped_key,
- '映射邮箱': m.mapped_email or ''
- })
-
- # Create DataFrame
- df = pd.DataFrame(data)
-
- # If no data, create an empty DataFrame with columns
- if not data:
- df = pd.DataFrame(columns=['手机号', '映射账号', '映射邮箱'])
- # Write to Excel BytesIO
- output = io.BytesIO()
- with pd.ExcelWriter(output, engine='openpyxl') as writer:
- df.to_excel(writer, index=False)
-
- output.seek(0)
-
- filename = f"mappings_app_{app_id}.xlsx"
- logger.info(f"导出映射成功: App {app_id}, Count {len(mappings)}")
-
- return StreamingResponse(
- output,
- media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- headers={"Content-Disposition": f"attachment; filename={filename}"}
- )
- @router.post("/{app_id}/mapping/preview", response_model=MappingPreviewResponse, summary="预览映射导入")
- async def preview_mapping(
- app_id: int,
- file: UploadFile = File(...),
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 预览 Excel/CSV 映射导入。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- contents = await file.read()
- filename = file.filename
- try:
- return MappingService.preview_import(db, app_id, contents, filename)
- except Exception as e:
- logger.error(f"导入预览失败: {e}", exc_info=True)
- raise HTTPException(status_code=400, detail=f"解析文件失败: {str(e)}")
- @router.post("/send-import-verification-code", summary="发送导入验证码")
- def send_import_verification_code(
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 发送验证码给当前登录用户(用于敏感操作验证,如导入)。
- """
- SmsService.send_code(current_user.mobile)
- logger.info(f"发送导入验证码: {current_user.mobile}")
- return {"message": "验证码已发送"}
- @router.post("/{app_id}/mapping/import", response_model=ImportLogResponse, summary="执行映射导入")
- async def import_mapping(
- app_id: int,
- file: UploadFile = File(...),
- strategy: MappingStrategy = Form(MappingStrategy.SKIP),
- verification_code: str = Form(...),
- db: Session = Depends(deps.get_db),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 执行映射导入操作。需要验证短信验证码。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- contents = await file.read()
- filename = file.filename
-
- logger.info(f"开始执行映射导入: App {app_id}, File {filename}, Strategy {strategy}")
- try:
- result = MappingService.execute_import(db, app_id, contents, filename, strategy, current_user.mobile, verification_code)
- except Exception as e:
- logger.error(f"执行映射导入异常: {e}", exc_info=True)
- raise e
-
- # LOGGING
- # For import, we log the summary and the logs structure
- LogService.create_log(
- db=db,
- app_id=app_id,
- operator_id=current_user.id,
- action_type=ActionType.IMPORT,
- details=result.model_dump(mode='json') # Store full result including logs
- )
-
- logger.info(f"映射导入完成: 成功 {result.summary.inserted + result.summary.updated}, 失败 {result.summary.failed}")
-
- return result
- @router.get("/mapping/users", response_model=UserSyncList, summary="获取全量用户(M2M)")
- def get_all_users_m2m(
- *,
- db: Session = Depends(deps.get_db),
- skip: int = 0,
- limit: int = 100,
- current_app: Application = Depends(deps.get_current_app),
- ):
- """
- 开发者拉取全量用户接口。
- 仅返回:手机号、姓名、英文名。
- 需要应用访问令牌 (Authorization Bearer JWT 或 X-App-Access-Token)。
- """
- query = db.query(User).filter(User.is_deleted == 0)
-
- total = query.count()
- users = query.order_by(User.id).offset(skip).limit(limit).all()
-
- return {"total": total, "items": users}
- @router.post("/mapping/sync", response_model=MappingResponse, summary="同步映射 (M2M)")
- def sync_mapping(
- *,
- db: Session = Depends(deps.get_db),
- sync_in: UserSyncRequest,
- current_app: Application = Depends(deps.get_current_app),
- ):
- """
- 从外部平台同步用户映射关系(机器对机器)。
- 支持增删改查:
- - UPSERT (默认): 创建或更新映射及用户。
- - DELETE: 仅删除应用与用户的映射关系,不删除用户。
- 需要应用访问令牌 (Authorization Bearer JWT 或 X-App-Access-Token)。
- """
- # Normalize input: treat empty strings as None
- mapped_key = sync_in.mapped_key if sync_in.mapped_key else None
- mapped_email = sync_in.mapped_email if sync_in.mapped_email else None
- in_name = sync_in.name if sync_in.name else None
- in_english_name = sync_in.english_name if sync_in.english_name else None
- logger.info(f"收到 M2M 同步请求: App {current_app.app_id}, Mobile {sync_in.mobile}, Action {sync_in.sync_action}")
- # ==========================================
- # 1. Handle DELETE Action
- # ==========================================
- if sync_in.sync_action == "DELETE":
- # 查找用户
- user = db.query(User).filter(User.mobile == sync_in.mobile).first()
- if not user:
- # 用户不存在,无法删除映射,直接抛出404或视作成功
- logger.warning(f"M2M 删除失败: 用户 {sync_in.mobile} 不存在")
- raise HTTPException(status_code=404, detail="用户不存在")
- # 查找映射
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.user_id == user.id
- ).first()
- if not mapping:
- logger.warning(f"M2M 删除失败: 映射不存在 (User {sync_in.mobile})")
- raise HTTPException(status_code=404, detail="映射关系不存在")
- # 构造返回数据(删除前快照,将状态置为 False)
- resp_data = MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=user.mobile,
- user_status=user.status,
- is_active=False # 标记为非活跃/已删除
- )
- # 执行物理删除 (只删映射,不删用户)
- db.delete(mapping)
- db.commit()
- # 记录日志
- LogService.create_log(
- db=db,
- app_id=current_app.id,
- operator_id=current_app.owner_id,
- action_type=ActionType.DELETE,
- target_user_id=user.id,
- target_mobile=user.mobile,
- details={"mapped_key": mapping.mapped_key, "action": "M2M_DELETE"}
- )
- logger.info(f"M2M 删除成功: {sync_in.mobile}")
- return resp_data
- # ==========================================
- # 2. Handle UPSERT Action (Existing Logic)
- # ==========================================
- # 0. Check Uniqueness for Name and English Name (Global Check)
- # We exclude the current user (by mobile) to allow updates to self without conflict
- if in_name:
- name_conflict = db.query(User).filter(
- User.name == in_name,
- User.mobile != sync_in.mobile
- ).first()
- if name_conflict:
- raise HTTPException(status_code=400, detail=f"姓名 '{in_name}' 已存在")
- if in_english_name:
- en_name_conflict = db.query(User).filter(
- User.english_name == in_english_name,
- User.mobile != sync_in.mobile
- ).first()
- if en_name_conflict:
- raise HTTPException(status_code=400, detail=f"英文名 '{in_english_name}' 已存在")
- # 1. Find User or Create
- user = db.query(User).filter(User.mobile == sync_in.mobile).first()
- new_user_created = False
-
- if not user:
- # Create New User
-
- # Auto-generate English name if missing but Chinese name is present
- if in_name and not in_english_name:
- in_english_name = generate_english_name(in_name)
- # Validation: Name and English Name are required for new users
- if not in_name or not in_english_name:
- raise HTTPException(status_code=400, detail="新建用户必须提供姓名和英文名称")
- # Auto create user
- password = security.generate_alphanumeric_password(8) # Random password letters+digits
-
- user = User(
- mobile=sync_in.mobile,
- password_hash=security.get_password_hash(password),
- status="ACTIVE",
- role="ORDINARY_USER",
- name=in_name,
- english_name=in_english_name
- )
- db.add(user)
- db.commit()
- db.refresh(user)
- new_user_created = True
- logger.info(f"M2M 自动创建用户: {sync_in.mobile}")
- else:
- # Update Existing User (if fields provided)
- updated = False
- if in_name is not None and user.name != in_name:
- user.name = in_name
- updated = True
-
- if in_english_name is not None and user.english_name != in_english_name:
- user.english_name = in_english_name
- updated = True
-
- if updated:
- db.add(user)
- db.commit()
- db.refresh(user)
- # 2. Handle Mapping
- mapping = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.user_id == user.id
- ).first()
- # Check Uniqueness for mapped_key (if changing or new, and provided)
- if mapped_key and (not mapping or mapping.mapped_key != mapped_key):
- key_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.mapped_key == mapped_key
- ).first()
- if key_exists:
- raise HTTPException(status_code=400, detail=f"该应用下账号 {mapped_key} 已被使用")
- # Check Uniqueness for mapped_email (if changing or new, and provided)
- if mapped_email and (not mapping or mapping.mapped_email != mapped_email):
- email_exists = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == current_app.id,
- AppUserMapping.mapped_email == mapped_email
- ).first()
- if email_exists:
- raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapped_email} 已被使用")
- new_mapping_created = False
- if mapping:
- # Update existing mapping
- if sync_in.mapped_key is not None:
- mapping.mapped_key = mapped_key
- if sync_in.is_active is not None:
- mapping.is_active = sync_in.is_active
- if sync_in.mapped_email is not None:
- mapping.mapped_email = mapped_email
- else:
- # Create new mapping
- new_mapping_created = True
- mapping = AppUserMapping(
- app_id=current_app.id,
- user_id=user.id,
- mapped_key=mapped_key,
- mapped_email=mapped_email,
- is_active=sync_in.is_active if sync_in.is_active is not None else True
- )
- db.add(mapping)
- db.commit()
- db.refresh(mapping)
- # LOGGING
- LogService.create_log(
- db=db,
- app_id=current_app.id,
- operator_id=current_app.owner_id,
- action_type=ActionType.SYNC_M2M,
- target_user_id=user.id,
- target_mobile=user.mobile,
- details={
- "mapped_key": mapped_key,
- "mapped_email": mapped_email,
- "new_user_created": new_user_created,
- "new_mapping_created": new_mapping_created
- }
- )
-
- logger.info(f"M2M 同步成功: {sync_in.mobile} (Mapping: {mapping.id})")
- return MappingResponse(
- id=mapping.id,
- app_id=mapping.app_id,
- user_id=mapping.user_id,
- mapped_key=mapping.mapped_key,
- mapped_email=mapping.mapped_email,
- user_mobile=user.mobile,
- user_status=user.status,
- is_active=mapping.is_active
- )
- # ==========================================
- # Operation Logs
- # ==========================================
- @router.get("/{app_id}/logs", response_model=OperationLogList, summary="获取操作日志")
- def read_logs(
- *,
- db: Session = Depends(deps.get_db),
- app_id: int,
- skip: int = 0,
- limit: int = 20,
- action_type: ActionType = Query(None),
- keyword: str = Query(None, description="搜索手机号"),
- start_date: datetime = Query(None),
- end_date: datetime = Query(None),
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 获取应用操作日志。
- """
- app = db.query(Application).filter(Application.id == app_id).first()
- if not app:
- raise HTTPException(status_code=404, detail="应用未找到")
-
- if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="权限不足")
- total, logs = LogService.get_logs(
- db=db,
- app_id=app_id,
- skip=skip,
- limit=limit,
- action_type=action_type,
- keyword=keyword,
- start_date=start_date,
- end_date=end_date
- )
-
- result = []
- for log in logs:
- # Enrich operator mobile
- operator_mobile = log.operator.mobile if log.operator else "Unknown"
-
- result.append(OperationLogResponse(
- id=log.id,
- app_id=log.app_id,
- action_type=log.action_type,
- target_mobile=log.target_mobile,
- details=log.details,
- operator_id=log.operator_id,
- operator_mobile=operator_mobile,
- target_user_id=log.target_user_id,
- created_at=log.created_at
- ))
-
- return {"total": total, "items": result}
|