import secrets import string import io import csv import pandas as pd import logging import json from typing import List, Optional from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, Response, UploadFile, File, Form, Query, Request from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session from sqlalchemy import desc, or_, func from app.api.v1 import deps from app.core import security from app.models.application import Application, ProtocolType from app.models.user import User, UserStatus from app.models.mapping import AppUserMapping from app.models.app_category import AppCategory from app.core.utils import generate_english_name, get_client_ip from app.schemas.application import ( ApplicationCreate, ApplicationUpdate, ApplicationResponse, ApplicationList, ApplicationSecretDisplay, ViewSecretRequest, RegenerateSecretRequest, ApplicationTransferRequest, AppSyncRequest, CategoryStats, AppCategoryCreate, AppCategoryUpdate, AppCategoryResponse, BatchUpdateCategoryRequest ) from app.schemas.mapping import ( MappingList, MappingResponse, MappingCreate, MappingUpdate, MappingDelete, MappingPreviewResponse, MappingImportSummary, MappingStrategy, ImportLogResponse ) from app.schemas.user import UserSyncRequest, UserSyncList from app.services.mapping_service import MappingService from app.services.sms_service import SmsService from app.services.log_service import LogService from app.services.hydra_service import hydra_service from app.schemas.operation_log import ActionType, OperationLogList, OperationLogResponse router = APIRouter() logger = logging.getLogger(__name__) def generate_access_token(): return secrets.token_urlsafe(32) def generate_app_credentials(): # Generate a random 16-char App ID (hex or alphanumeric) app_id = "app_" + secrets.token_hex(8) # Generate a strong 32-char App Secret alphabet = string.ascii_letters + string.digits app_secret = ''.join(secrets.choice(alphabet) for i in range(32)) return app_id, app_secret @router.get("/", response_model=ApplicationList, summary="获取应用列表") def read_apps( skip: int = 0, limit: int = 10, search: str = None, category_id: int = None, db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): """ 获取应用列表(分页)。 超级管理员可以查看所有,开发者只能查看自己的应用。 category_id: - None: 获取所有应用 - 0: 获取未分类的应用(category_id IS NULL) - >0: 获取指定分类的应用 """ query = db.query(Application).filter(Application.is_deleted == False) if current_user.role != "SUPER_ADMIN": query = query.filter(Application.owner_id == current_user.id) # 按分类筛选 if category_id is not None: if category_id == 0: # 未分类的应用 query = query.filter(Application.category_id.is_(None)) else: # 指定分类的应用 query = query.filter(Application.category_id == category_id) if search: # Search by name or app_id query = query.filter( or_( Application.app_name.ilike(f"%{search}%"), Application.app_id.ilike(f"%{search}%") ) ) total = query.count() apps = query.order_by(desc(Application.id)).offset(skip).limit(limit).all() # 为每个应用添加 category_name items = [] for app in apps: app_dict = { **{c.name: getattr(app, c.name) for c in app.__table__.columns}, "category_name": app.category.name if app.category else None } items.append(ApplicationResponse(**app_dict)) return {"total": total, "items": items} @router.get("/categories", response_model=List[CategoryStats], summary="获取所有应用分类") def get_categories( db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), search: str = None, ): """ 获取所有应用分类及其统计信息。 包括:预设分类 + 已使用的自定义分类 超级管理员查看所有,开发者只查看自己应用使用的分类。 """ try: logger.info(f"[分类管理] 请求开始 - 用户ID: {current_user.id}, 角色: {current_user.role}, 搜索参数: {repr(search)}") # 获取预设分类 preset_query = db.query(AppCategory).order_by(AppCategory.name) if search and search.strip(): preset_query = preset_query.filter(AppCategory.name.ilike(f"%{search.strip()}%")) preset_categories = preset_query.all() logger.info(f"[分类管理] 预设分类数量: {len(preset_categories)}") # 统计每个预设分类的应用数量 category_list = [] for cat in preset_categories: query = db.query(func.count(Application.id)).filter( Application.is_deleted == False, Application.category_id == cat.id ) if current_user.role != "SUPER_ADMIN": query = query.filter(Application.owner_id == current_user.id) app_count = query.scalar() or 0 category_list.append(CategoryStats( category_id=cat.id, category_name=cat.name, app_count=app_count )) logger.info(f"[分类管理] 最终返回数据数量: {len(category_list)}") return category_list except Exception as e: logger.error(f"[分类管理] 发生未捕获的异常: {e}", exc_info=True) raise @router.get("/categories/{category_id}/apps", response_model=ApplicationList, summary="获取分类下的应用列表") def get_category_apps( category_id: int, skip: int = 0, limit: int = 100, db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): """获取指定分类下的所有应用""" category = db.query(AppCategory).filter(AppCategory.id == category_id).first() if not category: raise HTTPException(status_code=404, detail="分类不存在") query = db.query(Application).filter( Application.is_deleted == False, Application.category_id == category_id ) if current_user.role != "SUPER_ADMIN": query = query.filter(Application.owner_id == current_user.id) total = query.count() apps = query.order_by(desc(Application.id)).offset(skip).limit(limit).all() # 为每个应用添加 category_name items = [] for app in apps: app_dict = { **{c.name: getattr(app, c.name) for c in app.__table__.columns}, "category_name": app.category.name if app.category else None } items.append(ApplicationResponse(**app_dict)) return {"total": total, "items": items} @router.post("/categories/batch-update", summary="批量更新应用分类") def batch_update_app_category( req: BatchUpdateCategoryRequest, db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): """批量更新应用的分类""" if not req.password: raise HTTPException(status_code=400, detail="需要提供管理员密码") if not security.verify_password(req.password, current_user.password_hash): raise HTTPException(status_code=400, detail="密码错误") if not req.app_ids: raise HTTPException(status_code=400, detail="请选择要更新的应用") # 验证分类是否存在(如果提供了 category_id) if req.category_id: category = db.query(AppCategory).filter(AppCategory.id == req.category_id).first() if not category: raise HTTPException(status_code=404, detail="分类不存在") # 查询要更新的应用 query = db.query(Application).filter( Application.id.in_(req.app_ids), Application.is_deleted == False ) if current_user.role != "SUPER_ADMIN": query = query.filter(Application.owner_id == current_user.id) apps = query.all() if len(apps) != len(req.app_ids): raise HTTPException(status_code=403, detail="部分应用不存在或无权限") # 批量更新分类 updated_count = 0 for app in apps: app.category_id = req.category_id db.add(app) updated_count += 1 db.commit() action = f"设置为分类ID {req.category_id}" if req.category_id else "移除分类" logger.info(f"批量更新应用分类成功: {updated_count}个应用 {action} (Operator: {current_user.mobile})") return {"message": f"成功更新 {updated_count} 个应用的分类", "count": updated_count} @router.delete("/categories/{category_id}", summary="删除分类") def delete_category( category_id: int, verification_code: str = Query(..., description="手机验证码"), db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): """ 删除分类。 注意:必须先移出该分类下的所有应用才能删除分类。 需要手机验证码验证。 """ if not SmsService.verify_code(current_user.mobile, verification_code): raise HTTPException(status_code=400, detail="验证码无效或已过期") category = db.query(AppCategory).filter(AppCategory.id == category_id).first() if not category: raise HTTPException(status_code=404, detail="分类不存在") # 检查是否有应用使用此分类 apps_count = db.query(Application).filter( Application.is_deleted == False, Application.category_id == category_id ).count() if apps_count > 0: raise HTTPException( status_code=400, detail=f"无法删除:有 {apps_count} 个应用正在使用此分类。请先移出这些应用后再删除分类。" ) db.delete(category) db.commit() logger.info(f"分类删除成功: {category.name} (Operator: {current_user.mobile})") return {"message": f"成功删除分类 '{category.name}'"} # ========================================== # Preset Category Management # ========================================== @router.get("/preset-categories", response_model=List[AppCategoryResponse], summary="获取预设分类列表") def get_preset_categories( db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): """获取所有预设分类""" categories = db.query(AppCategory).order_by(AppCategory.name).all() return categories @router.post("/preset-categories", response_model=AppCategoryResponse, summary="创建预设分类") def create_preset_category( category_in: AppCategoryCreate, db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): """创建预设分类(仅超级管理员)""" if current_user.role != "SUPER_ADMIN": raise HTTPException(status_code=403, detail="权限不足") # 检查分类名是否已存在 existing = db.query(AppCategory).filter(AppCategory.name == category_in.name).first() if existing: raise HTTPException(status_code=400, detail=f"分类 '{category_in.name}' 已存在") category = AppCategory(**category_in.model_dump()) db.add(category) db.commit() db.refresh(category) logger.info(f"预设分类创建成功: {category_in.name} (Operator: {current_user.mobile})") return category @router.put("/preset-categories/{category_id}", response_model=AppCategoryResponse, summary="更新预设分类") def update_preset_category( category_id: int, category_in: AppCategoryUpdate, db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): """更新预设分类(仅超级管理员,需要密码验证)""" if current_user.role != "SUPER_ADMIN": raise HTTPException(status_code=403, detail="权限不足") # 密码验证 if not category_in.password: raise HTTPException(status_code=400, detail="需要提供密码") from app.core import security if not security.verify_password(category_in.password, current_user.password_hash): logger.warning(f"预设分类更新失败: 密码错误 (User: {current_user.mobile})") raise HTTPException(status_code=403, detail="密码错误") category = db.query(AppCategory).filter(AppCategory.id == category_id).first() if not category: raise HTTPException(status_code=404, detail="预设分类未找到") # 如果更新名称,检查是否重复 if category_in.name and category_in.name != category.name: existing = db.query(AppCategory).filter(AppCategory.name == category_in.name).first() if existing: raise HTTPException(status_code=400, detail=f"分类 '{category_in.name}' 已存在") update_data = category_in.model_dump(exclude_unset=True) # 移除密码字段,不保存到数据库 update_data.pop('password', None) for field, value in update_data.items(): setattr(category, field, value) db.commit() db.refresh(category) logger.info(f"预设分类更新成功: {category.name} (Operator: {current_user.mobile})") return category @router.delete("/preset-categories/{category_id}", summary="删除预设分类") def delete_preset_category( category_id: int, db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): """删除预设分类(仅超级管理员)""" if current_user.role != "SUPER_ADMIN": raise HTTPException(status_code=403, detail="权限不足") category = db.query(AppCategory).filter(AppCategory.id == category_id).first() if not category: raise HTTPException(status_code=404, detail="预设分类未找到") # 检查是否有应用使用此分类 apps_count = db.query(Application).filter( Application.category_id == category.id, Application.is_deleted == False ).count() if apps_count > 0: raise HTTPException( status_code=400, detail=f"无法删除:有 {apps_count} 个应用正在使用此分类" ) db.delete(category) db.commit() logger.info(f"预设分类删除成功: {category.name} (Operator: {current_user.mobile})") return {"message": "删除成功"} @router.get("/{app_id}", response_model=ApplicationResponse, summary="获取单个应用详情") def read_app( *, db: Session = Depends(deps.get_db), app_id: int, current_user: User = Depends(deps.get_current_active_user), ): """ 获取单个应用详情。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") # 设置 category_name app_dict = { **{c.name: getattr(app, c.name) for c in app.__table__.columns}, "category_name": app.category.name if app.category else None } return ApplicationResponse(**app_dict) @router.post("/", response_model=ApplicationSecretDisplay, summary="创建应用") def create_app( *, db: Session = Depends(deps.get_db), app_in: ApplicationCreate, current_user: User = Depends(deps.get_current_active_user), ): """ 创建新应用。只会返回一次明文密钥。 """ # 1. Generate ID and Secret app_id, app_secret = generate_app_credentials() # 2. Generate Access Token access_token = generate_access_token() # 3. Store Secret (Plain text needed for HMAC verification) db_app = Application( app_id=app_id, app_secret=app_secret, access_token=access_token, app_name=app_in.app_name, icon_url=app_in.icon_url, protocol_type=app_in.protocol_type, redirect_uris=app_in.redirect_uris, notification_url=app_in.notification_url, description=app_in.description, category_id=app_in.category_id, owner_id=current_user.id # Assign owner ) # 验证 category_id 是否存在 if app_in.category_id: category = db.query(AppCategory).filter(AppCategory.id == app_in.category_id).first() if not category: raise HTTPException(status_code=400, detail=f"分类ID {app_in.category_id} 不存在") db.add(db_app) db.commit() db.refresh(db_app) logger.info(f"应用创建成功: {app_in.app_name} (ID: {app_id}, Owner: {current_user.mobile})") # 如果是 OIDC 应用,自动在 Hydra 中创建 / 更新对应的 OAuth2 Client if db_app.protocol_type == ProtocolType.OIDC: try: raw = db_app.redirect_uris or "" redirect_uris: list[str] = [] if raw: # 1. 优先按 JSON 解析(支持 ["url1","url2"] 或 "url1") try: parsed = json.loads(raw) if isinstance(parsed, list): redirect_uris = [str(u).strip() for u in parsed if str(u).strip()] elif isinstance(parsed, str): if parsed.strip(): redirect_uris = [parsed.strip()] except Exception: # 2. 非 JSON 时,支持逗号分隔或单个 URL parts = [u.strip() for u in raw.split(",") if u.strip()] if parts: redirect_uris = parts hydra_service.create_or_update_client( client_id=db_app.app_id, client_secret=db_app.app_secret, redirect_uris=redirect_uris, client_name=db_app.app_name or db_app.app_id, ) logger.info(f"Hydra OIDC Client 已创建/更新: {db_app.app_id}, redirect_uris={redirect_uris}") except Exception as e: logger.exception( "应用创建成功,但在 Hydra 创建 OIDC Client 失败 (app_id=%s): %s", db_app.app_id, e, ) # 如需强一致,可以在这里 raise HTTPException 中断创建流程 return ApplicationSecretDisplay(app_id=app_id, app_secret=app_secret, access_token=access_token) @router.put("/{app_id}", response_model=ApplicationResponse, summary="更新应用") def update_app( *, db: Session = Depends(deps.get_db), app_id: int, app_in: ApplicationUpdate, current_user: User = Depends(deps.get_current_active_user), ): """ 更新应用信息。需要手机验证码和密码验证。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") # Check ownership if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") # Security Verification if not app_in.verification_code: raise HTTPException(status_code=400, detail="需要提供手机验证码") if not SmsService.verify_code(current_user.mobile, app_in.verification_code): logger.warning(f"应用更新失败: 验证码错误 (User: {current_user.mobile})") raise HTTPException(status_code=400, detail="验证码无效或已过期") update_data = app_in.model_dump(exclude_unset=True) # Remove security fields from update data update_data.pop('password', None) update_data.pop('verification_code', None) # 验证 category_id 是否存在 if 'category_id' in update_data and update_data['category_id']: category = db.query(AppCategory).filter(AppCategory.id == update_data['category_id']).first() if not category: raise HTTPException(status_code=400, detail=f"分类ID {update_data['category_id']} 不存在") for field, value in update_data.items(): setattr(app, field, value) db.add(app) db.commit() db.refresh(app) # 如果是 OIDC 应用,编辑后同步 Hydra 中的 OAuth2 Client if app.protocol_type == ProtocolType.OIDC: try: raw = app.redirect_uris or "" redirect_uris: list[str] = [] if raw: # 1. 优先按 JSON 解析(支持 ["url1","url2"] 或 "url1") try: parsed = json.loads(raw) if isinstance(parsed, list): redirect_uris = [str(u).strip() for u in parsed if str(u).strip()] elif isinstance(parsed, str): if parsed.strip(): redirect_uris = [parsed.strip()] except Exception: # 2. 非 JSON 时,支持逗号分隔或单个 URL parts = [u.strip() for u in raw.split(",") if u.strip()] if parts: redirect_uris = parts hydra_service.create_or_update_client( client_id=app.app_id, client_secret=app.app_secret, redirect_uris=redirect_uris, client_name=app.app_name or app.app_id, ) logger.info(f"Hydra OIDC Client 已在编辑后同步: {app.app_id}, redirect_uris={redirect_uris}") except Exception as e: logger.exception( "应用编辑成功,但在 Hydra 同步 OIDC Client 失败 (app_id=%s): %s", app.app_id, e, ) # 5. Log LogService.create_log( db=db, app_id=app.id, operator_id=current_user.id, action_type=ActionType.UPDATE, details=update_data ) logger.info(f"应用更新成功: {app.app_name} (ID: {app.app_id})") return app @router.delete("/{app_id}", response_model=ApplicationResponse, summary="删除应用") def delete_app( *, db: Session = Depends(deps.get_db), app_id: int, current_user: User = Depends(deps.get_current_active_user), ): """ 软删除应用。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") # Check ownership if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") app.is_deleted = True db.add(app) db.commit() logger.info(f"应用删除成功: {app.app_name} (ID: {app.app_id}, Operator: {current_user.mobile})") return app @router.post("/{app_id}/regenerate-secret", response_model=ApplicationSecretDisplay, summary="重新生成密钥") def regenerate_secret( *, db: Session = Depends(deps.get_db), app_id: int, req: RegenerateSecretRequest, current_user: User = Depends(deps.get_current_active_user), ): """ 重新生成应用密钥。需要手机验证码和密码验证。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") # Check ownership if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") # Security Verification if not security.verify_password(req.password, current_user.password_hash): logger.warning(f"重置密钥失败: 密码错误 (User: {current_user.mobile})") raise HTTPException(status_code=403, detail="密码错误") if not SmsService.verify_code(current_user.mobile, req.verification_code): logger.warning(f"重置密钥失败: 验证码错误 (User: {current_user.mobile})") raise HTTPException(status_code=400, detail="验证码无效或已过期") _, new_secret = generate_app_credentials() app.app_secret = new_secret db.add(app) db.commit() # Log LogService.create_log( db=db, app_id=app.id, operator_id=current_user.id, action_type=ActionType.REGENERATE_SECRET, details={"message": "Regenerated App Secret"} ) logger.info(f"应用密钥已重置: {app.app_name} (ID: {app.app_id})") return ApplicationSecretDisplay(app_id=app.app_id, app_secret=new_secret, access_token=app.access_token) @router.post("/{app_id}/view-secret", response_model=ApplicationSecretDisplay, summary="查看密钥") def view_secret( *, db: Session = Depends(deps.get_db), app_id: int, req: ViewSecretRequest, current_user: User = Depends(deps.get_current_active_user), ): """ 查看应用密钥。需要验证用户密码。 """ # 1. Verify Password if not security.verify_password(req.password, current_user.password_hash): logger.warning(f"查看密钥失败: 密码错误 (User: {current_user.mobile})") raise HTTPException(status_code=403, detail="密码错误") app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") # Check ownership if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") # Log LogService.create_log( db=db, app_id=app.id, operator_id=current_user.id, action_type=ActionType.VIEW_SECRET, details={"message": "Viewed App Secret"} ) logger.info(f"查看应用密钥: {app.app_name} (Operator: {current_user.mobile})") return ApplicationSecretDisplay(app_id=app.app_id, app_secret=app.app_secret, access_token=app.access_token) @router.post("/{app_id}/transfer", response_model=ApplicationResponse, summary="转让应用") def transfer_app( *, db: Session = Depends(deps.get_db), app_id: int, req: ApplicationTransferRequest, current_user: User = Depends(deps.get_current_active_user), ): """ 将应用转让给其他开发者或超级管理员。 需要验证:目标用户手机号、当前用户密码、短信验证码。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") # Check ownership if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") # 1. Verify Password if not security.verify_password(req.password, current_user.password_hash): logger.warning(f"转让应用失败: 密码错误 (User: {current_user.mobile})") raise HTTPException(status_code=403, detail="密码错误") # 2. Verify SMS Code if not SmsService.verify_code(current_user.mobile, req.verification_code): logger.warning(f"转让应用失败: 验证码错误 (User: {current_user.mobile})") raise HTTPException(status_code=400, detail="验证码无效或已过期") # 3. Verify Target User target_user = db.query(User).filter(User.mobile == req.target_mobile, User.is_deleted == 0).first() if not target_user: raise HTTPException(status_code=404, detail="目标用户不存在") if target_user.status != "ACTIVE": raise HTTPException(status_code=400, detail="目标用户状态不正常") if target_user.role not in ["DEVELOPER", "SUPER_ADMIN"]: raise HTTPException(status_code=400, detail="目标用户必须是开发者或超级管理员") if target_user.id == app.owner_id: raise HTTPException(status_code=400, detail="应用已归属于该用户") # 4. Transfer old_owner_id = app.owner_id app.owner_id = target_user.id db.add(app) db.commit() db.refresh(app) # 5. Log LogService.create_log( db=db, app_id=app.id, operator_id=current_user.id, action_type=ActionType.TRANSFER, target_user_id=target_user.id, target_mobile=target_user.mobile, details={ "old_owner_id": old_owner_id, "new_owner_id": target_user.id } ) logger.info(f"应用转让成功: {app.app_name} 从 {current_user.mobile} 转让给 {target_user.mobile}") return app # ========================================== # Mappings # ========================================== @router.get("/{app_id}/mappings", response_model=MappingList, summary="获取应用映射列表") def read_mappings( *, db: Session = Depends(deps.get_db), app_id: int, skip: int = 0, limit: int = 10, search: Optional[str] = Query(None, description="按手机号、映射账号、映射邮箱模糊匹配"), mapping_is_active: Optional[bool] = Query(None, description="映射是否启用,不传不筛选"), user_status: Optional[str] = Query(None, description="统一认证账号状态:ACTIVE|PENDING|DISABLED|DELETED"), current_user: User = Depends(deps.get_current_active_user), ): """ 获取应用的账号映射列表。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") query = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id) if mapping_is_active is not None: query = query.filter(AppUserMapping.is_active == mapping_is_active) need_user_join = bool(search and search.strip()) or bool(user_status and user_status.strip()) if need_user_join: query = query.outerjoin(User, AppUserMapping.user_id == User.id) if search and search.strip(): term = f"%{search.strip()}%" query = query.filter( or_( AppUserMapping.mapped_key.ilike(term), AppUserMapping.mapped_email.ilike(term), User.mobile.ilike(term), ) ) if user_status and user_status.strip(): st = user_status.strip().upper() if st == "DELETED": query = query.filter(User.id.is_(None)) elif st in ("ACTIVE", "PENDING", "DISABLED"): query = query.filter(User.status == UserStatus[st]) else: raise HTTPException(status_code=400, detail="无效的 user_status") total = query.count() mappings = query.order_by(desc(AppUserMapping.id)).offset(skip).limit(limit).all() # Enrich with user mobile (handled by ORM relation usually, but for Pydantic 'from_attributes') # We added `user_mobile` to MappingResponse, so we need to ensure it's populated. # The ORM `mapping.user` is lazy loaded, which is fine for sync code. result = [] for m in mappings: result.append(MappingResponse( id=m.id, app_id=m.app_id, user_id=m.user_id, mapped_key=m.mapped_key, mapped_email=m.mapped_email, user_mobile=m.user.mobile if m.user else "Deleted User", user_status=m.user.status if m.user else "DELETED", is_active=m.is_active )) return {"total": total, "items": result} @router.post("/{app_id}/mappings", response_model=MappingResponse, summary="创建映射") def create_mapping( *, db: Session = Depends(deps.get_db), app_id: int, mapping_in: MappingCreate, current_user: User = Depends(deps.get_current_active_user), ): """ 手动创建映射。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") # Verify Password if not security.verify_password(mapping_in.password, current_user.password_hash): logger.warning(f"创建映射失败: 密码错误 (User: {current_user.mobile})") raise HTTPException(status_code=403, detail="密码错误") # Normalize input: treat empty strings as None to avoid unique constraint violations mapped_key = mapping_in.mapped_key if mapping_in.mapped_key else None mapped_email = mapping_in.mapped_email if mapping_in.mapped_email else None # 1. Find User or Create user = db.query(User).filter(User.mobile == mapping_in.mobile, User.is_deleted == 0).first() new_user_created = False generated_password = None if not user: # Auto create user password_plain = security.generate_alphanumeric_password(8) # Random password letters+digits random_suffix = security.generate_alphanumeric_password(6) user = User( mobile=mapping_in.mobile, password_hash=security.get_password_hash(password_plain), status="ACTIVE", role="ORDINARY_USER", name=f"用户{random_suffix}", english_name=mapped_key ) db.add(user) db.commit() db.refresh(user) new_user_created = True generated_password = password_plain logger.info(f"自动创建用户: {user.mobile}") # 2. Check if mapping exists existing = db.query(AppUserMapping).filter( AppUserMapping.app_id == app_id, AppUserMapping.user_id == user.id ).first() if existing: raise HTTPException(status_code=400, detail="该用户的映射已存在") # 3. Check Uniqueness for mapped_email (if provided) if mapped_email: email_exists = db.query(AppUserMapping).filter( AppUserMapping.app_id == app_id, AppUserMapping.mapped_email == mapped_email ).first() if email_exists: raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapped_email} 已被使用") # 4. Check Uniqueness for mapped_key if mapped_key: key_exists = db.query(AppUserMapping).filter( AppUserMapping.app_id == app_id, AppUserMapping.mapped_key == mapped_key ).first() if key_exists: raise HTTPException(status_code=400, detail=f"该应用下账号 {mapped_key} 已被使用") # 5. Create mapping = AppUserMapping( app_id=app_id, user_id=user.id, mapped_key=mapped_key, mapped_email=mapped_email ) db.add(mapping) db.commit() db.refresh(mapping) # LOGGING LogService.create_log( db=db, app_id=app_id, operator_id=current_user.id, action_type=ActionType.MANUAL_ADD, target_user_id=user.id, target_mobile=user.mobile, details={ "mapped_key": mapped_key, "mapped_email": mapped_email, "new_user_created": new_user_created } ) logger.info(f"映射创建成功: App {app_id} -> User {user.mobile} ({mapped_key})") return MappingResponse( id=mapping.id, app_id=mapping.app_id, user_id=mapping.user_id, mapped_key=mapping.mapped_key, mapped_email=mapping.mapped_email, user_mobile=user.mobile, user_status=user.status, is_active=mapping.is_active, new_user_created=new_user_created, generated_password=generated_password ) @router.put("/{app_id}/mappings/{mapping_id}", response_model=MappingResponse, summary="更新映射") def update_mapping( *, db: Session = Depends(deps.get_db), app_id: int, mapping_id: int, mapping_in: MappingUpdate, current_user: User = Depends(deps.get_current_active_user), ): """ 更新映射信息。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") # Verify Password if not security.verify_password(mapping_in.password, current_user.password_hash): logger.warning(f"更新映射失败: 密码错误 (User: {current_user.mobile})") raise HTTPException(status_code=403, detail="密码错误") mapping = db.query(AppUserMapping).filter( AppUserMapping.id == mapping_id, AppUserMapping.app_id == app_id ).first() if not mapping: raise HTTPException(status_code=404, detail="映射未找到") # Check Uniqueness for mapped_key if mapping_in.mapped_key is not None and mapping_in.mapped_key != mapping.mapped_key: if mapping_in.mapped_key: key_exists = db.query(AppUserMapping).filter( AppUserMapping.app_id == app_id, AppUserMapping.mapped_key == mapping_in.mapped_key ).first() if key_exists: raise HTTPException(status_code=400, detail=f"该应用下账号 {mapping_in.mapped_key} 已被使用") # Check Uniqueness for mapped_email if mapping_in.mapped_email is not None and mapping_in.mapped_email != mapping.mapped_email: if mapping_in.mapped_email: email_exists = db.query(AppUserMapping).filter( AppUserMapping.app_id == app_id, AppUserMapping.mapped_email == mapping_in.mapped_email ).first() if email_exists: raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapping_in.mapped_email} 已被使用") # Capture old values for logging old_key = mapping.mapped_key old_email = mapping.mapped_email old_is_active = mapping.is_active if mapping_in.mapped_key is not None: mapping.mapped_key = mapping_in.mapped_key if mapping_in.mapped_email is not None: mapping.mapped_email = mapping_in.mapped_email if mapping_in.is_active is not None: mapping.is_active = mapping_in.is_active db.add(mapping) db.commit() db.refresh(mapping) # LOGGING LogService.create_log( db=db, app_id=app_id, operator_id=current_user.id, action_type=ActionType.UPDATE, target_user_id=mapping.user_id, target_mobile=mapping.user.mobile if mapping.user else None, details={ "old": { "mapped_key": old_key, "mapped_email": old_email, "is_active": old_is_active, }, "new": { "mapped_key": mapping.mapped_key, "mapped_email": mapping.mapped_email, "is_active": mapping.is_active, }, } ) logger.info(f"映射更新成功: App {app_id} User {mapping.user.mobile if mapping.user else 'unknown'}") return MappingResponse( id=mapping.id, app_id=mapping.app_id, user_id=mapping.user_id, mapped_key=mapping.mapped_key, mapped_email=mapping.mapped_email, user_mobile=mapping.user.mobile if mapping.user else "Deleted User", user_status=mapping.user.status if mapping.user else "DELETED", is_active=mapping.is_active ) @router.delete("/{app_id}/mappings/{mapping_id}", summary="删除映射") def delete_mapping( *, db: Session = Depends(deps.get_db), app_id: int, mapping_id: int, req: MappingDelete, current_user: User = Depends(deps.get_current_active_user), ): """ 删除映射关系。需验证密码。 """ # Verify Password if not security.verify_password(req.password, current_user.password_hash): logger.warning(f"删除映射失败: 密码错误 (User: {current_user.mobile})") raise HTTPException(status_code=403, detail="密码错误") mapping = db.query(AppUserMapping).filter( AppUserMapping.id == mapping_id, AppUserMapping.app_id == app_id ).first() if not mapping: raise HTTPException(status_code=404, detail="映射未找到") app = db.query(Application).filter(Application.id == app_id).first() if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") # Capture for logging target_user_id = mapping.user_id target_mobile = mapping.user.mobile if mapping.user else None db.delete(mapping) db.commit() # LOGGING LogService.create_log( db=db, app_id=app_id, operator_id=current_user.id, action_type=ActionType.DELETE, target_user_id=target_user_id, target_mobile=target_mobile, details={"mapping_id": mapping_id} ) logger.info(f"映射删除成功: App {app_id}, Mapping {mapping_id}") return {"message": "删除成功"} @router.post("/{app_id}/sync-users", summary="同步所有用户") def sync_users_to_app( *, db: Session = Depends(deps.get_db), app_id: int, current_user: User = Depends(deps.get_current_active_user), ): """ 一键导入用户管理中的用户数据到应用映射中。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") # Get all active users users = db.query(User).filter(User.is_deleted == 0).all() # Get existing mappings (user_ids) existing_mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all() mapped_user_ids = {m.user_id for m in existing_mappings} new_mappings = [] logger.info(f"开始同步用户到应用 {app.app_name} (ID: {app_id})") for user in users: if user.id in mapped_user_ids: continue # Create mapping mapped_key = user.english_name if user.english_name else user.mobile mapping = AppUserMapping( app_id=app.id, user_id=user.id, mapped_key=mapped_key, mapped_email=None, is_active=True ) new_mappings.append(mapping) if new_mappings: try: db.bulk_save_objects(new_mappings) db.commit() logger.info(f"用户同步完成: 新增 {len(new_mappings)} 条映射") except Exception as e: db.rollback() logger.error(f"批量同步用户失败: {e}。尝试逐条插入。") # Fallback: try one by one success_count = 0 for m in new_mappings: try: db.add(m) db.commit() success_count += 1 except Exception as ex: db.rollback() logger.warning(f"单个用户映射失败 (User: {m.user_id}): {ex}") LogService.create_log( db=db, app_id=app.id, operator_id=current_user.id, action_type=ActionType.IMPORT, details={"message": "Sync all users (partial)", "attempted": len(new_mappings), "success": success_count} ) return {"message": f"同步完成,成功 {success_count} 个,失败 {len(new_mappings) - success_count} 个 (可能是账号冲突)"} # Log success LogService.create_log( db=db, app_id=app.id, operator_id=current_user.id, action_type=ActionType.IMPORT, details={"message": "Sync all users", "count": len(new_mappings)} ) return {"message": f"同步成功,新增 {len(new_mappings)} 个用户映射"} logger.info("用户同步: 没有需要同步的新用户") return {"message": "没有需要同步的用户"} @router.post("/{app_id}/sync-users-v2", summary="同步用户 (新版)") def sync_users_to_app_v2( *, db: Session = Depends(deps.get_db), app_id: int, sync_req: AppSyncRequest, current_user: User = Depends(deps.get_current_active_user), ): """ 高级用户同步功能。 支持全量/部分同步,以及可选的默认邮箱初始化。 需要手机验证码。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") # 1. Verify SMS Code if not SmsService.verify_code(current_user.mobile, sync_req.verification_code): logger.warning(f"同步用户失败: 验证码错误 (User: {current_user.mobile})") raise HTTPException(status_code=400, detail="验证码无效或已过期") # 2. Determine Target Users query = db.query(User).filter(User.is_deleted == 0) if sync_req.mode == "SELECTED": if not sync_req.user_ids: raise HTTPException(status_code=400, detail="请选择要同步的用户") query = query.filter(User.id.in_(sync_req.user_ids)) users = query.all() if not users: return {"message": "没有找到可同步的用户"} # 3. Get existing mappings (user_ids) to skip existing_mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all() mapped_user_ids = {m.user_id for m in existing_mappings} # Check if email domain is valid format if provided (simple check) if sync_req.init_email and not sync_req.email_domain: raise HTTPException(status_code=400, detail="开启邮箱初始化时必须填写域名") new_mappings = [] for user in users: if user.id in mapped_user_ids: continue mapped_key = user.english_name if user.english_name else user.mobile mapped_email = None if sync_req.init_email and user.english_name: # Construct email domain = sync_req.email_domain.strip() if not domain.startswith("@"): domain = "@" + domain mapped_email = f"{user.english_name}{domain}" mapping = AppUserMapping( app_id=app.id, user_id=user.id, mapped_key=mapped_key, mapped_email=mapped_email, is_active=True ) new_mappings.append(mapping) if not new_mappings: return {"message": "所有选中的用户均已存在映射,无需同步"} # 4. Insert logger.info(f"开始同步(v2)用户到应用 {app.app_name},计划新增 {len(new_mappings)} 条") success_count = 0 fail_count = 0 for m in new_mappings: try: # Additional check: uniqueness of mapped_key in this app db.add(m) db.commit() success_count += 1 except Exception as e: db.rollback() fail_count += 1 logger.warning(f"同步单个映射失败 (User: {m.user_id}): {e}") # 5. Log LogService.create_log( db=db, app_id=app.id, operator_id=current_user.id, action_type=ActionType.SYNC, details={ "mode": sync_req.mode, "init_email": sync_req.init_email, "total_attempted": len(new_mappings), "success": success_count, "failed": fail_count } ) logger.info(f"同步(v2)完成。成功: {success_count}, 失败: {fail_count}") msg = f"同步完成。成功: {success_count},失败: {fail_count}" if fail_count > 0: msg += " (失败原因可能是账号或邮箱冲突)" return {"message": msg} @router.get("/{app_id}/mappings/export", summary="导出映射") def export_mappings( *, db: Session = Depends(deps.get_db), app_id: int, current_user: User = Depends(deps.get_current_active_user), ): """ 导出所有映射到 Excel (.xlsx)。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") mappings = db.query(AppUserMapping).filter(AppUserMapping.app_id == app_id).all() # Prepare data for DataFrame data = [] for m in mappings: mobile = m.user.mobile if m.user else "Deleted User" data.append({ '手机号': mobile, '映射账号': m.mapped_key, '映射邮箱': m.mapped_email or '' }) # Create DataFrame df = pd.DataFrame(data) # If no data, create an empty DataFrame with columns if not data: df = pd.DataFrame(columns=['手机号', '映射账号', '映射邮箱']) # Write to Excel BytesIO output = io.BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: df.to_excel(writer, index=False) output.seek(0) filename = f"mappings_app_{app_id}.xlsx" logger.info(f"导出映射成功: App {app_id}, Count {len(mappings)}") return StreamingResponse( output, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": f"attachment; filename={filename}"} ) @router.post("/{app_id}/mapping/preview", response_model=MappingPreviewResponse, summary="预览映射导入") async def preview_mapping( app_id: int, file: UploadFile = File(...), db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): """ 预览 Excel/CSV 映射导入。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") contents = await file.read() filename = file.filename try: return MappingService.preview_import(db, app_id, contents, filename) except Exception as e: logger.error(f"导入预览失败: {e}", exc_info=True) raise HTTPException(status_code=400, detail=f"解析文件失败: {str(e)}") @router.post("/send-import-verification-code", summary="发送导入验证码") def send_import_verification_code( current_user: User = Depends(deps.get_current_active_user), ): """ 发送验证码给当前登录用户(用于敏感操作验证,如导入)。 """ SmsService.send_code(current_user.mobile) logger.info(f"发送导入验证码: {current_user.mobile}") return {"message": "验证码已发送"} @router.post("/{app_id}/mapping/import", response_model=ImportLogResponse, summary="执行映射导入") async def import_mapping( app_id: int, file: UploadFile = File(...), strategy: MappingStrategy = Form(MappingStrategy.SKIP), verification_code: str = Form(...), db: Session = Depends(deps.get_db), current_user: User = Depends(deps.get_current_active_user), ): """ 执行映射导入操作。需要验证短信验证码。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") contents = await file.read() filename = file.filename logger.info(f"开始执行映射导入: App {app_id}, File {filename}, Strategy {strategy}") try: result = MappingService.execute_import(db, app_id, contents, filename, strategy, current_user.mobile, verification_code) except Exception as e: logger.error(f"执行映射导入异常: {e}", exc_info=True) raise e # LOGGING # For import, we log the summary and the logs structure LogService.create_log( db=db, app_id=app_id, operator_id=current_user.id, action_type=ActionType.IMPORT, details=result.model_dump(mode='json') # Store full result including logs ) logger.info(f"映射导入完成: 成功 {result.summary.inserted + result.summary.updated}, 失败 {result.summary.failed}") return result @router.get("/mapping/users", response_model=UserSyncList, summary="获取全量用户(M2M)") def get_all_users_m2m( *, db: Session = Depends(deps.get_db), skip: int = 0, limit: int = 100, current_app: Application = Depends(deps.get_current_app), ): """ 开发者拉取全量用户接口。 仅返回:手机号、姓名、英文名。 需要应用访问令牌 (Authorization Bearer JWT 或 X-App-Access-Token)。 """ query = db.query(User).filter(User.is_deleted == 0) total = query.count() users = query.order_by(User.id).offset(skip).limit(limit).all() return {"total": total, "items": users} @router.post("/mapping/sync", response_model=MappingResponse, summary="同步映射 (M2M)") def sync_mapping( request: Request, *, db: Session = Depends(deps.get_db), sync_in: UserSyncRequest, current_app: Application = Depends(deps.get_current_app), ): """ 从外部平台同步用户映射关系(机器对机器)。 支持增删改查: - UPSERT (默认): 创建或更新映射及用户;必须提供姓名;新建用户时英文名由姓名自动生成并去重。 - DELETE: 仅删除应用与用户的映射关系,不删除用户。 需要应用访问令牌 (Authorization Bearer JWT 或 X-App-Access-Token)。 """ # Normalize input: treat empty strings as None (mobile / mapped_key 已在 Schema 中强制校验) mapped_key = sync_in.mapped_key mapped_email = sync_in.mapped_email if sync_in.mapped_email else None logger.info(f"收到 M2M 同步请求: App {current_app.app_id}, Mobile {sync_in.mobile}, Action {sync_in.sync_action}") # ========================================== # 1. Handle DELETE Action # ========================================== if sync_in.sync_action == "DELETE": # 查找用户 user = db.query(User).filter(User.mobile == sync_in.mobile).first() if not user: # 用户不存在,无法删除映射,直接抛出404或视作成功 logger.warning(f"M2M 删除失败: 用户 {sync_in.mobile} 不存在") raise HTTPException(status_code=404, detail="用户不存在") # 查找映射 mapping = db.query(AppUserMapping).filter( AppUserMapping.app_id == current_app.id, AppUserMapping.user_id == user.id ).first() if not mapping: logger.warning(f"M2M 删除失败: 映射不存在 (User {sync_in.mobile})") raise HTTPException(status_code=404, detail="映射关系不存在") if mapping.mapped_key is None: raise HTTPException( status_code=400, detail="映射记录缺少外部账号,请在平台侧补全后再删除", ) if mapping.mapped_key != mapped_key: raise HTTPException(status_code=400, detail="映射账号与平台记录不一致") # 构造返回数据(删除前快照,将状态置为 False) resp_data = MappingResponse( id=mapping.id, app_id=mapping.app_id, user_id=mapping.user_id, mapped_key=mapping.mapped_key, mapped_email=mapping.mapped_email, user_mobile=user.mobile, user_status=user.status, is_active=False # 标记为非活跃/已删除 ) # 执行物理删除 (只删映射,不删用户) db.delete(mapping) db.commit() # 记录日志 LogService.create_log( db=db, app_id=current_app.id, operator_id=current_app.owner_id, action_type=ActionType.DELETE, target_user_id=user.id, target_mobile=user.mobile, ip_address=get_client_ip(request), details={ "mapped_key": mapping.mapped_key, "action": "M2M_DELETE", "source": "M2M", "sync_action": "DELETE", }, ) logger.info(f"M2M 删除成功: {sync_in.mobile}") return resp_data # ========================================== # 2. Handle UPSERT Action (Existing Logic) # ========================================== in_name = (sync_in.name or "").strip() if not in_name: raise HTTPException(status_code=400, detail="同步操作必须提供姓名") # 0. Check Uniqueness for Name (Global Check) # We exclude the current user (by mobile) to allow updates to self without conflict name_conflict = db.query(User).filter( User.name == in_name, User.mobile != sync_in.mobile, ).first() if name_conflict: raise HTTPException(status_code=400, detail=f"姓名 '{in_name}' 已存在") # 1. Find User or Create user = db.query(User).filter(User.mobile == sync_in.mobile).first() new_user_created = False if not user: # Create New User — english_name 始终由姓名生成并去重(忽略请求中的 english_name) in_english_name = generate_english_name(in_name).strip() if not in_english_name: in_english_name = f"u{sync_in.mobile[-4:]}" original_base = in_english_name counter = 1 while db.query(User).filter( User.english_name == in_english_name, User.is_deleted == 0, ).first(): in_english_name = f"{original_base}{counter}" counter += 1 # Auto create user password = security.generate_alphanumeric_password(8) # Random password letters+digits user = User( mobile=sync_in.mobile, password_hash=security.get_password_hash(password), status="ACTIVE", role="ORDINARY_USER", name=in_name, english_name=in_english_name ) db.add(user) db.commit() db.refresh(user) new_user_created = True logger.info(f"M2M 自动创建用户: {sync_in.mobile}") else: # Update Existing User - 已有用户不允许修改 name、mobile、english_name # Check if trying to modify restricted fields if sync_in.mobile != user.mobile: raise HTTPException(status_code=400, detail="已有用户不允许修改手机号") if (user.name or "").strip() != in_name: raise HTTPException(status_code=400, detail="已有用户不允许修改姓名") # 已有用户的其他字段(如status等)可以更新,但当前M2M接口不涉及 # 这里只处理映射关系的更新 # 2. Handle Mapping mapping = db.query(AppUserMapping).filter( AppUserMapping.app_id == current_app.id, AppUserMapping.user_id == user.id ).first() # Check Uniqueness for mapped_key (if changing or new, and provided) if mapped_key and (not mapping or mapping.mapped_key != mapped_key): key_exists = db.query(AppUserMapping).filter( AppUserMapping.app_id == current_app.id, AppUserMapping.mapped_key == mapped_key ).first() if key_exists: raise HTTPException(status_code=400, detail=f"该应用下账号 {mapped_key} 已被使用") # Check Uniqueness for mapped_email (if changing or new, and provided) if mapped_email and (not mapping or mapping.mapped_email != mapped_email): email_exists = db.query(AppUserMapping).filter( AppUserMapping.app_id == current_app.id, AppUserMapping.mapped_email == mapped_email ).first() if email_exists: raise HTTPException(status_code=400, detail=f"该应用下邮箱 {mapped_email} 已被使用") new_mapping_created = False if mapping: # Update existing mapping mapping.mapped_key = mapped_key if sync_in.is_active is not None: mapping.is_active = sync_in.is_active if sync_in.mapped_email is not None: mapping.mapped_email = mapped_email else: # Create new mapping new_mapping_created = True mapping = AppUserMapping( app_id=current_app.id, user_id=user.id, mapped_key=mapped_key, mapped_email=mapped_email, is_active=sync_in.is_active if sync_in.is_active is not None else True ) db.add(mapping) db.commit() db.refresh(mapping) # LOGGING LogService.create_log( db=db, app_id=current_app.id, operator_id=current_app.owner_id, action_type=ActionType.SYNC_M2M, target_user_id=user.id, target_mobile=user.mobile, ip_address=get_client_ip(request), details={ "mapped_key": mapped_key, "mapped_email": mapped_email, "new_user_created": new_user_created, "new_mapping_created": new_mapping_created, "sync_action": "UPSERT", "source": "M2M", "is_active": mapping.is_active, }, ) logger.info(f"M2M 同步成功: {sync_in.mobile} (Mapping: {mapping.id})") return MappingResponse( id=mapping.id, app_id=mapping.app_id, user_id=mapping.user_id, mapped_key=mapping.mapped_key, mapped_email=mapping.mapped_email, user_mobile=user.mobile, user_status=user.status, is_active=mapping.is_active ) # ========================================== # Operation Logs # ========================================== @router.get("/{app_id}/logs", response_model=OperationLogList, summary="获取操作日志") def read_logs( *, db: Session = Depends(deps.get_db), app_id: int, skip: int = 0, limit: int = 20, action_type: ActionType = Query(None), keyword: str = Query(None, description="搜索手机号"), start_date: datetime = Query(None), end_date: datetime = Query(None), current_user: User = Depends(deps.get_current_active_user), ): """ 获取应用操作日志。 """ app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="应用未找到") if current_user.role != "SUPER_ADMIN" and app.owner_id != current_user.id: raise HTTPException(status_code=403, detail="权限不足") total, logs = LogService.get_logs( db=db, app_id=app_id, skip=skip, limit=limit, action_type=action_type, keyword=keyword, start_date=start_date, end_date=end_date ) result = [] for log in logs: # Enrich operator mobile operator_mobile = log.operator.mobile if log.operator else "Unknown" result.append(OperationLogResponse( id=log.id, app_id=log.app_id, action_type=log.action_type, target_mobile=log.target_mobile, details=log.details, operator_id=log.operator_id, operator_mobile=operator_mobile, target_user_id=log.target_user_id, created_at=log.created_at )) return {"total": total, "items": result}