| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- import io
- import pandas as pd
- from datetime import datetime
- from typing import List, Tuple
- from sqlalchemy.orm import Session
- from fastapi import HTTPException
- from app.models.user import User, UserRole, UserStatus
- from app.models.mapping import AppUserMapping
- from app.schemas.mapping import (
- MappingRowPreview,
- MappingRowStatus,
- MappingPreviewResponse,
- MappingStrategy,
- MappingImportSummary,
- ImportLogResponse,
- ImportLogItem
- )
- from app.core import security
- from app.services.sms_service import SmsService
- class MappingService:
- @staticmethod
- def process_excel_file(file_content: bytes) -> pd.DataFrame:
- """
- Reads excel file bytes into DataFrame.
- Expected columns: 'mobile'/'手机号', 'mapped_key'/'映射账号'
- """
- try:
- # Try reading as Excel first
- try:
- df = pd.read_excel(io.BytesIO(file_content))
- except:
- # Fallback to CSV
- df = pd.read_csv(io.BytesIO(file_content))
- # Normalize headers to support Chinese and English
- # Map: 手机号 -> mobile, 映射账号 -> mapped_key, 映射邮箱 -> mapped_email
- column_map = {
- '手机号': 'mobile',
- '映射账号': 'mapped_key',
- '映射邮箱': 'mapped_email',
- 'user mobile': 'mobile',
- 'mapped key': 'mapped_key',
- 'mapped email': 'mapped_email'
- }
-
- new_columns = []
- for col in df.columns:
- col_str = str(col).strip()
- col_lower = col_str.lower()
- if col_str in column_map:
- new_columns.append(column_map[col_str])
- elif col_lower in column_map:
- new_columns.append(column_map[col_lower])
- else:
- new_columns.append(col_lower)
-
- df.columns = new_columns
-
- return df
- except Exception as e:
- raise ValueError(f"文件格式无效: {str(e)}")
- @staticmethod
- def preview_import(db: Session, app_id: int, file_content: bytes, filename: str = "") -> MappingPreviewResponse:
- df = MappingService.process_excel_file(file_content)
-
- required_cols = {'mobile', 'mapped_key'}
- if not required_cols.issubset(df.columns):
- raise ValueError(f"缺少必要列。请确保文件包含: 手机号(mobile), 映射账号(mapped_key)")
- preview_rows: List[MappingRowPreview] = []
- stats = {"valid": 0, "error": 0, "new": 0, "update": 0}
- # 1. Batch query all users involved
- mobiles = df['mobile'].astype(str).unique().tolist()
- users = db.query(User).filter(User.mobile.in_(mobiles)).all()
- user_map = {u.mobile: u.id for u in users}
- # 2. Batch query existing mappings for this app
- user_ids = list(user_map.values())
- existing_mappings = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.user_id.in_(user_ids)
- ).all()
- mapping_map = {m.user_id: m for m in existing_mappings}
- # 3. Iterate rows
- for index, row in df.iterrows():
- mobile = str(row['mobile']).strip()
- mapped_key = str(row['mapped_key']).strip()
- mapped_email = None
- if 'mapped_email' in df.columns and not pd.isna(row['mapped_email']):
- val = str(row['mapped_email']).strip()
- mapped_email = val if val else None
-
- row_preview = MappingRowPreview(
- row_index=index + 1, # 1-based index for UI
- mobile=mobile,
- mapped_key=mapped_key,
- mapped_email=mapped_email,
- status=MappingRowStatus.ERROR
- )
- if not mobile or not mapped_key or mobile == 'nan' or mapped_key == 'nan':
- row_preview.message = "手机号或映射账号为空"
- stats["error"] += 1
- elif mobile not in user_map:
- row_preview.status = MappingRowStatus.AUTO_CREATE_USER
- row_preview.message = "用户不存在,将自动创建"
- stats["new"] += 1
- stats["valid"] += 1
- else:
- user_id = user_map[mobile]
- row_preview.user_id = user_id
-
- if user_id in mapping_map:
- row_preview.status = MappingRowStatus.UPDATE
- row_preview.message = f"当前映射: {mapping_map[user_id].mapped_key}"
- stats["update"] += 1
- stats["valid"] += 1
- else:
- row_preview.status = MappingRowStatus.NEW
- stats["new"] += 1
- stats["valid"] += 1
-
- preview_rows.append(row_preview)
- return MappingPreviewResponse(
- total_rows=len(df),
- valid_count=stats["valid"],
- error_count=stats["error"],
- new_count=stats["new"],
- update_count=stats["update"],
- preview_rows=preview_rows
- )
- @staticmethod
- def execute_import(
- db: Session,
- app_id: int,
- file_content: bytes,
- filename: str,
- strategy: MappingStrategy,
- current_user_mobile: str,
- verification_code: str
- ) -> ImportLogResponse:
-
- # 0. Verify SMS Code
- if not SmsService.verify_code(current_user_mobile, verification_code):
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
- preview = MappingService.preview_import(db, app_id, file_content, filename)
-
- summary = MappingImportSummary(total_processed=0, inserted=0, updated=0, failed=0)
- logs: List[ImportLogItem] = []
- import_time = datetime.now()
-
- for row in preview.preview_rows:
- summary.total_processed += 1
-
- log_item = ImportLogItem(
- import_time=import_time,
- mobile=row.mobile,
- mapped_key=row.mapped_key,
- mapped_email=row.mapped_email,
- status="Failed",
- message="",
- is_unified_account_created=False
- )
- if row.status == MappingRowStatus.ERROR:
- summary.failed += 1
- log_item.status = "Failed"
- log_item.message = row.message or "数据错误"
- logs.append(log_item)
- continue
-
- try:
- if row.status == MappingRowStatus.NEW or row.status == MappingRowStatus.AUTO_CREATE_USER:
- user_id = row.user_id
-
- if row.status == MappingRowStatus.AUTO_CREATE_USER:
- # Create User
- pwd = security.generate_alphanumeric_password(8)
- new_user = User(
- mobile=row.mobile,
- password_hash=security.get_password_hash(pwd),
- status=UserStatus.ACTIVE,
- role=UserRole.ORDINARY_USER
- )
- db.add(new_user)
- db.flush() # flush to get ID
- user_id = new_user.id
-
- log_item.is_unified_account_created = True
- log_item.generated_password = pwd
-
- # Check uniqueness before insert (Concurrency edge case or just double check)
- # We rely on unique constraint but catching it is cleaner
-
- new_mapping = AppUserMapping(
- app_id=app_id,
- user_id=user_id,
- mapped_key=row.mapped_key,
- mapped_email=row.mapped_email
- )
- db.add(new_mapping)
- summary.inserted += 1
- log_item.status = "Success"
- log_item.message = "映射创建成功"
-
- elif row.status == MappingRowStatus.UPDATE:
- if strategy == MappingStrategy.OVERWRITE:
- # Find and update
- existing = db.query(AppUserMapping).filter(
- AppUserMapping.app_id == app_id,
- AppUserMapping.user_id == row.user_id
- ).first()
- if existing:
- existing.mapped_key = row.mapped_key
- existing.mapped_email = row.mapped_email
- db.add(existing)
- summary.updated += 1
- log_item.status = "Success"
- log_item.message = "映射更新成功"
- else:
- # SKIP
- log_item.status = "Skipped"
- log_item.message = "映射已存在 (已跳过)"
-
- except Exception as e:
- summary.failed += 1
- log_item.status = "Failed"
- log_item.message = str(e)
-
- logs.append(log_item)
-
- db.commit()
- return ImportLogResponse(summary=summary, logs=logs)
|