import io import pandas as pd import random import string import re from sqlalchemy.orm import Session from sqlalchemy import or_ from app.models.user import User, UserStatus, UserRole from app.models.import_log import ImportLog from app.core.security import get_password_hash # from app.core.utils import generate_password from typing import List, Dict, Any, Tuple from datetime import datetime # Helper to generate random password if utils doesn't have one def _generate_random_password(length=8): chars = string.ascii_letters + string.digits return ''.join(random.choice(chars) for _ in range(length)) def _generate_random_string(length=6): chars = string.ascii_lowercase + string.digits return ''.join(random.choice(chars) for _ in range(length)) MOBILE_REGEX = r'^1[3-9]\d{9}$' class UserImportService: @staticmethod def preview_excel(file_contents: bytes, filename: str) -> Dict[str, Any]: """ Read first few rows of excel to let user map columns. """ try: if filename.endswith('.csv'): df = pd.read_csv(io.BytesIO(file_contents), header=None, nrows=10) else: df = pd.read_excel(io.BytesIO(file_contents), header=None, nrows=10) # Convert to list of lists (handle NaN) data = df.fillna("").values.tolist() # Generate default headers like A, B, C... if no header? # Or just return data and let frontend handle "Row 1 is header" logic. # Returning raw data is flexible. return { "preview_data": data, "filename": filename } except Exception as e: raise ValueError(f"Failed to parse file: {str(e)}") @staticmethod def process_import( db: Session, file_contents: bytes, filename: str, start_row: int, # 1-based index from frontend mapping: Dict[str, int], # Field -> Column Index (0-based) user_id: int ) -> ImportLog: """ Process the Excel file based on mapping. start_row: The row index where data starts (0-based in pandas logic? Frontend usually gives 1-based). mapping: {"mobile": 0, "name": 1, "english_name": 2} """ # Load Data try: if filename.endswith('.csv'): df = pd.read_csv(io.BytesIO(file_contents), header=None) else: df = pd.read_excel(io.BytesIO(file_contents), header=None) except Exception as e: raise ValueError(f"Failed to read file: {str(e)}") # Adjust start_row to 0-based data_start_idx = start_row - 1 if data_start_idx < 0 or data_start_idx >= len(df): raise ValueError("Invalid start row") # Slice dataframe df_data = df.iloc[data_start_idx:] success_count = 0 fail_count = 0 results = [] # Stores {row: 1, status: success/fail, msg: ..., account: ..., password: ...} # Cache existing mobiles to minimize DB hits (optional, but good for batch) # For simplicity and correctness with concurrent edits, we check DB row by row or handle IntegrityError. # But logic requires "Repeated mobile cannot be imported", so check first. for index, row in df_data.iterrows(): row_num = index + 1 # 1-based row number for display try: with db.begin_nested(): # 1. Extract Data mobile_col = mapping.get("mobile") name_col = mapping.get("name") en_name_col = mapping.get("english_name") mobile = str(row[mobile_col]).strip() if mobile_col is not None and pd.notna(row[mobile_col]) else None name = str(row[name_col]).strip() if name_col is not None and pd.notna(row[name_col]) else None en_name = str(row[en_name_col]).strip() if en_name_col is not None and pd.notna(row[en_name_col]) else None if not mobile: raise ValueError("Mobile is required") # Validate Mobile Format if not re.match(MOBILE_REGEX, mobile): raise ValueError(f"Invalid mobile format: {mobile}") # 2. Check Mobile Uniqueness existing_user = db.query(User).filter(User.mobile == mobile).first() if existing_user: raise ValueError(f"Mobile {mobile} already exists") # 3. Handle Name Duplicates (Auto-increment or Random) if not name: name = f"用户_{_generate_random_string(6)}" final_name = UserImportService._resolve_duplicate_name(db, name) if not en_name: en_name = f"user_{_generate_random_string(6)}" final_en_name = UserImportService._resolve_duplicate_en_name(db, en_name) # 4. Create User plain_password = _generate_random_password() password_hash = get_password_hash(plain_password) new_user = User( mobile=mobile, name=final_name, english_name=final_en_name, password_hash=password_hash, status=UserStatus.ACTIVE, role=UserRole.ORDINARY_USER ) db.add(new_user) db.flush() success_count += 1 results.append({ "row": row_num, "status": "success", "mobile": mobile, "name": final_name, "english_name": final_en_name, "initial_password": plain_password }) except Exception as e: fail_count += 1 results.append({ "row": row_num, "status": "fail", "error": str(e) }) # Commit successful rows db.commit() # Re-calc counts if commit failed totally? # Let's refine the loop with begin_nested() for robust partial success. # Save Log log = ImportLog( filename=filename, total_count=success_count + fail_count, success_count=success_count, fail_count=fail_count, result_data=results, # Stores passwords! Warning to user about this. created_by=user_id ) db.add(log) db.commit() db.refresh(log) return log @staticmethod def _resolve_duplicate_name(db: Session, base_name: str) -> str: # Simple heuristic: check base_name, then base_name1, base_name2... # Optimization: Query all matching `base_name%` and find gaps? # Simpler: Loop. # Check exact match first if not db.query(User).filter(User.name == base_name).first(): return base_name i = 1 while True: new_name = f"{base_name}{i}" if not db.query(User).filter(User.name == new_name).first(): return new_name i += 1 @staticmethod def _resolve_duplicate_en_name(db: Session, base_name: str) -> str: if not db.query(User).filter(User.english_name == base_name).first(): return base_name i = 1 while True: new_name = f"{base_name}{i}" if not db.query(User).filter(User.english_name == new_name).first(): return new_name i += 1