import io import pandas as pd import logging from datetime import datetime from typing import List, Tuple from sqlalchemy.orm import Session from fastapi import HTTPException from app.models.user import User, UserRole, UserStatus from app.models.mapping import AppUserMapping from app.schemas.mapping import ( MappingRowPreview, MappingRowStatus, MappingPreviewResponse, MappingStrategy, MappingImportSummary, ImportLogResponse, ImportLogItem ) from app.core import security from app.services.sms_service import SmsService logger = logging.getLogger(__name__) class MappingService: @staticmethod def process_excel_file(file_content: bytes) -> pd.DataFrame: """ Reads excel file bytes into DataFrame. Expected columns: 'mobile'/'手机号', 'mapped_key'/'映射账号' """ try: # Try reading as Excel first try: df = pd.read_excel(io.BytesIO(file_content)) except: # Fallback to CSV df = pd.read_csv(io.BytesIO(file_content)) # Normalize headers to support Chinese and English # Map: 手机号 -> mobile, 映射账号 -> mapped_key, 映射邮箱 -> mapped_email column_map = { '手机号': 'mobile', '映射账号': 'mapped_key', '映射邮箱': 'mapped_email', 'user mobile': 'mobile', 'mapped key': 'mapped_key', 'mapped email': 'mapped_email' } new_columns = [] for col in df.columns: col_str = str(col).strip() col_lower = col_str.lower() if col_str in column_map: new_columns.append(column_map[col_str]) elif col_lower in column_map: new_columns.append(column_map[col_lower]) else: new_columns.append(col_lower) df.columns = new_columns return df except Exception as e: logger.error(f"文件解析失败: {e}", exc_info=True) raise ValueError(f"文件格式无效: {str(e)}") @staticmethod def preview_import(db: Session, app_id: int, file_content: bytes, filename: str = "") -> MappingPreviewResponse: try: df = MappingService.process_excel_file(file_content) required_cols = {'mobile', 'mapped_key'} if not required_cols.issubset(df.columns): raise ValueError(f"缺少必要列。请确保文件包含: 手机号(mobile), 映射账号(mapped_key)") preview_rows: List[MappingRowPreview] = [] stats = {"valid": 0, "error": 0, "new": 0, "update": 0} # 1. Batch query all users involved mobiles = df['mobile'].astype(str).unique().tolist() users = db.query(User).filter(User.mobile.in_(mobiles)).all() user_map = {u.mobile: u.id for u in users} # 2. Batch query existing mappings for this app user_ids = list(user_map.values()) existing_mappings = db.query(AppUserMapping).filter( AppUserMapping.app_id == app_id, AppUserMapping.user_id.in_(user_ids) ).all() mapping_map = {m.user_id: m for m in existing_mappings} # 3. Iterate rows for index, row in df.iterrows(): mobile = str(row['mobile']).strip() mapped_key = str(row['mapped_key']).strip() mapped_email = None if 'mapped_email' in df.columns and not pd.isna(row['mapped_email']): val = str(row['mapped_email']).strip() mapped_email = val if val else None row_preview = MappingRowPreview( row_index=index + 1, # 1-based index for UI mobile=mobile, mapped_key=mapped_key, mapped_email=mapped_email, status=MappingRowStatus.ERROR ) if not mobile or not mapped_key or mobile == 'nan' or mapped_key == 'nan': row_preview.message = "手机号或映射账号为空" stats["error"] += 1 elif mobile not in user_map: row_preview.status = MappingRowStatus.AUTO_CREATE_USER row_preview.message = "用户不存在,将自动创建" stats["new"] += 1 stats["valid"] += 1 else: user_id = user_map[mobile] row_preview.user_id = user_id if user_id in mapping_map: row_preview.status = MappingRowStatus.UPDATE row_preview.message = f"当前映射: {mapping_map[user_id].mapped_key}" stats["update"] += 1 stats["valid"] += 1 else: row_preview.status = MappingRowStatus.NEW stats["new"] += 1 stats["valid"] += 1 preview_rows.append(row_preview) logger.info(f"导入预览完成: App {app_id}, 文件 {filename}, 行数 {len(df)}") return MappingPreviewResponse( total_rows=len(df), valid_count=stats["valid"], error_count=stats["error"], new_count=stats["new"], update_count=stats["update"], preview_rows=preview_rows ) except Exception as e: logger.error(f"导入预览异常: {e}", exc_info=True) raise @staticmethod def execute_import( db: Session, app_id: int, file_content: bytes, filename: str, strategy: MappingStrategy, current_user_mobile: str, verification_code: str ) -> ImportLogResponse: # 0. Verify SMS Code if not SmsService.verify_code(current_user_mobile, verification_code): logger.warning(f"导入执行失败: 验证码错误 (User: {current_user_mobile})") raise HTTPException(status_code=400, detail="验证码无效或已过期") preview = MappingService.preview_import(db, app_id, file_content, filename) summary = MappingImportSummary(total_processed=0, inserted=0, updated=0, failed=0) logs: List[ImportLogItem] = [] import_time = datetime.now() for row in preview.preview_rows: summary.total_processed += 1 log_item = ImportLogItem( import_time=import_time, mobile=row.mobile, mapped_key=row.mapped_key, mapped_email=row.mapped_email, status="Failed", message="", is_unified_account_created=False ) if row.status == MappingRowStatus.ERROR: summary.failed += 1 log_item.status = "Failed" log_item.message = row.message or "数据错误" logs.append(log_item) continue try: if row.status == MappingRowStatus.NEW or row.status == MappingRowStatus.AUTO_CREATE_USER: user_id = row.user_id if row.status == MappingRowStatus.AUTO_CREATE_USER: # Create User pwd = security.generate_alphanumeric_password(8) # Generate random suffix for name random_suffix = security.generate_alphanumeric_password(6) new_user = User( mobile=row.mobile, password_hash=security.get_password_hash(pwd), status=UserStatus.ACTIVE, role=UserRole.ORDINARY_USER, name=f"用户{random_suffix}", english_name=row.mapped_key ) db.add(new_user) db.flush() # flush to get ID user_id = new_user.id log_item.is_unified_account_created = True log_item.generated_password = pwd # Check uniqueness before insert (Concurrency edge case or just double check) # We rely on unique constraint but catching it is cleaner new_mapping = AppUserMapping( app_id=app_id, user_id=user_id, mapped_key=row.mapped_key, mapped_email=row.mapped_email ) db.add(new_mapping) summary.inserted += 1 log_item.status = "Success" log_item.message = "映射创建成功" elif row.status == MappingRowStatus.UPDATE: if strategy == MappingStrategy.OVERWRITE: # Find and update existing = db.query(AppUserMapping).filter( AppUserMapping.app_id == app_id, AppUserMapping.user_id == row.user_id ).first() if existing: existing.mapped_key = row.mapped_key existing.mapped_email = row.mapped_email db.add(existing) summary.updated += 1 log_item.status = "Success" log_item.message = "映射更新成功" else: # SKIP log_item.status = "Skipped" log_item.message = "映射已存在 (已跳过)" except Exception as e: summary.failed += 1 log_item.status = "Failed" log_item.message = str(e) logger.error(f"导入行失败 (Mobile: {row.mobile}): {e}") logs.append(log_item) db.commit() return ImportLogResponse(summary=summary, logs=logs)