import io import pandas as pd from datetime import datetime from typing import List, Tuple from sqlalchemy.orm import Session from fastapi import HTTPException from app.models.user import User, UserRole, UserStatus from app.models.mapping import AppUserMapping from app.schemas.mapping import ( MappingRowPreview, MappingRowStatus, MappingPreviewResponse, MappingStrategy, MappingImportSummary, ImportLogResponse, ImportLogItem ) from app.core import security from app.services.sms_service import SmsService class MappingService: @staticmethod def process_excel_file(file_content: bytes) -> pd.DataFrame: """ Reads excel file bytes into DataFrame. Expected columns: 'mobile'/'手机号', 'mapped_key'/'映射账号' """ try: # Try reading as Excel first try: df = pd.read_excel(io.BytesIO(file_content)) except: # Fallback to CSV df = pd.read_csv(io.BytesIO(file_content)) # Normalize headers to support Chinese and English # Map: 手机号 -> mobile, 映射账号 -> mapped_key, 映射邮箱 -> mapped_email column_map = { '手机号': 'mobile', '映射账号': 'mapped_key', '映射邮箱': 'mapped_email', 'user mobile': 'mobile', 'mapped key': 'mapped_key', 'mapped email': 'mapped_email' } new_columns = [] for col in df.columns: col_str = str(col).strip() col_lower = col_str.lower() if col_str in column_map: new_columns.append(column_map[col_str]) elif col_lower in column_map: new_columns.append(column_map[col_lower]) else: new_columns.append(col_lower) df.columns = new_columns return df except Exception as e: raise ValueError(f"文件格式无效: {str(e)}") @staticmethod def preview_import(db: Session, app_id: int, file_content: bytes, filename: str = "") -> MappingPreviewResponse: df = MappingService.process_excel_file(file_content) required_cols = {'mobile', 'mapped_key'} if not required_cols.issubset(df.columns): raise ValueError(f"缺少必要列。请确保文件包含: 手机号(mobile), 映射账号(mapped_key)") preview_rows: List[MappingRowPreview] = [] stats = {"valid": 0, "error": 0, "new": 0, "update": 0} # 1. Batch query all users involved mobiles = df['mobile'].astype(str).unique().tolist() users = db.query(User).filter(User.mobile.in_(mobiles)).all() user_map = {u.mobile: u.id for u in users} # 2. Batch query existing mappings for this app user_ids = list(user_map.values()) existing_mappings = db.query(AppUserMapping).filter( AppUserMapping.app_id == app_id, AppUserMapping.user_id.in_(user_ids) ).all() mapping_map = {m.user_id: m for m in existing_mappings} # 3. Iterate rows for index, row in df.iterrows(): mobile = str(row['mobile']).strip() mapped_key = str(row['mapped_key']).strip() mapped_email = None if 'mapped_email' in df.columns and not pd.isna(row['mapped_email']): val = str(row['mapped_email']).strip() mapped_email = val if val else None row_preview = MappingRowPreview( row_index=index + 1, # 1-based index for UI mobile=mobile, mapped_key=mapped_key, mapped_email=mapped_email, status=MappingRowStatus.ERROR ) if not mobile or not mapped_key or mobile == 'nan' or mapped_key == 'nan': row_preview.message = "手机号或映射账号为空" stats["error"] += 1 elif mobile not in user_map: row_preview.status = MappingRowStatus.AUTO_CREATE_USER row_preview.message = "用户不存在,将自动创建" stats["new"] += 1 stats["valid"] += 1 else: user_id = user_map[mobile] row_preview.user_id = user_id if user_id in mapping_map: row_preview.status = MappingRowStatus.UPDATE row_preview.message = f"当前映射: {mapping_map[user_id].mapped_key}" stats["update"] += 1 stats["valid"] += 1 else: row_preview.status = MappingRowStatus.NEW stats["new"] += 1 stats["valid"] += 1 preview_rows.append(row_preview) return MappingPreviewResponse( total_rows=len(df), valid_count=stats["valid"], error_count=stats["error"], new_count=stats["new"], update_count=stats["update"], preview_rows=preview_rows ) @staticmethod def execute_import( db: Session, app_id: int, file_content: bytes, filename: str, strategy: MappingStrategy, current_user_mobile: str, verification_code: str ) -> ImportLogResponse: # 0. Verify SMS Code if not SmsService.verify_code(current_user_mobile, verification_code): raise HTTPException(status_code=400, detail="验证码无效或已过期") preview = MappingService.preview_import(db, app_id, file_content, filename) summary = MappingImportSummary(total_processed=0, inserted=0, updated=0, failed=0) logs: List[ImportLogItem] = [] import_time = datetime.now() for row in preview.preview_rows: summary.total_processed += 1 log_item = ImportLogItem( import_time=import_time, mobile=row.mobile, mapped_key=row.mapped_key, mapped_email=row.mapped_email, status="Failed", message="", is_unified_account_created=False ) if row.status == MappingRowStatus.ERROR: summary.failed += 1 log_item.status = "Failed" log_item.message = row.message or "数据错误" logs.append(log_item) continue try: if row.status == MappingRowStatus.NEW or row.status == MappingRowStatus.AUTO_CREATE_USER: user_id = row.user_id if row.status == MappingRowStatus.AUTO_CREATE_USER: # Create User pwd = security.generate_alphanumeric_password(8) new_user = User( mobile=row.mobile, password_hash=security.get_password_hash(pwd), status=UserStatus.ACTIVE, role=UserRole.ORDINARY_USER ) db.add(new_user) db.flush() # flush to get ID user_id = new_user.id log_item.is_unified_account_created = True log_item.generated_password = pwd # Check uniqueness before insert (Concurrency edge case or just double check) # We rely on unique constraint but catching it is cleaner new_mapping = AppUserMapping( app_id=app_id, user_id=user_id, mapped_key=row.mapped_key, mapped_email=row.mapped_email ) db.add(new_mapping) summary.inserted += 1 log_item.status = "Success" log_item.message = "映射创建成功" elif row.status == MappingRowStatus.UPDATE: if strategy == MappingStrategy.OVERWRITE: # Find and update existing = db.query(AppUserMapping).filter( AppUserMapping.app_id == app_id, AppUserMapping.user_id == row.user_id ).first() if existing: existing.mapped_key = row.mapped_key existing.mapped_email = row.mapped_email db.add(existing) summary.updated += 1 log_item.status = "Success" log_item.message = "映射更新成功" else: # SKIP log_item.status = "Skipped" log_item.message = "映射已存在 (已跳过)" except Exception as e: summary.failed += 1 log_item.status = "Failed" log_item.message = str(e) logs.append(log_item) db.commit() return ImportLogResponse(summary=summary, logs=logs)