| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- import io
- import pandas as pd
- import random
- import string
- import re
- from xpinyin import Pinyin
- from sqlalchemy.orm import Session
- from sqlalchemy import or_
- from app.models.user import User, UserStatus, UserRole
- from app.models.import_log import ImportLog
- from app.core.security import get_password_hash
- from app.core.utils import generate_english_name
- # from app.core.utils import generate_password
- from typing import List, Dict, Any, Tuple
- from datetime import datetime
- # Helper to generate random password if utils doesn't have one
- def _generate_random_password(length=8):
- chars = string.ascii_letters + string.digits
- return ''.join(random.choice(chars) for _ in range(length))
- def _generate_random_string(length=6):
- chars = string.ascii_lowercase + string.digits
- return ''.join(random.choice(chars) for _ in range(length))
- MOBILE_REGEX = r'^1[3-9]\d{9}$'
- class UserImportService:
-
- @staticmethod
- def preview_excel(file_contents: bytes, filename: str) -> Dict[str, Any]:
- """
- Read first few rows of excel to let user map columns.
- """
- try:
- if filename.endswith('.csv'):
- df = pd.read_csv(io.BytesIO(file_contents), header=None, nrows=10)
- else:
- df = pd.read_excel(io.BytesIO(file_contents), header=None, nrows=10)
-
- # Convert to list of lists (handle NaN)
- data = df.fillna("").values.tolist()
-
- # Generate default headers like A, B, C... if no header?
- # Or just return data and let frontend handle "Row 1 is header" logic.
- # Returning raw data is flexible.
- return {
- "preview_data": data,
- "filename": filename
- }
- except Exception as e:
- raise ValueError(f"Failed to parse file: {str(e)}")
- @staticmethod
- def process_import(
- db: Session,
- file_contents: bytes,
- filename: str,
- start_row: int, # 1-based index from frontend
- mapping: Dict[str, int], # Field -> Column Index (0-based)
- user_id: int
- ) -> ImportLog:
- """
- Process the Excel file based on mapping.
- start_row: The row index where data starts (0-based in pandas logic? Frontend usually gives 1-based).
- mapping: {"mobile": 0, "name": 1, "english_name": 2}
- """
-
- # Load Data
- try:
- if filename.endswith('.csv'):
- df = pd.read_csv(io.BytesIO(file_contents), header=None)
- else:
- df = pd.read_excel(io.BytesIO(file_contents), header=None)
- except Exception as e:
- raise ValueError(f"Failed to read file: {str(e)}")
- # Adjust start_row to 0-based
- data_start_idx = start_row - 1
- if data_start_idx < 0 or data_start_idx >= len(df):
- raise ValueError("Invalid start row")
- # Slice dataframe
- df_data = df.iloc[data_start_idx:]
-
- success_count = 0
- fail_count = 0
- results = [] # Stores {row: 1, status: success/fail, msg: ..., account: ..., password: ...}
- # Cache existing mobiles to minimize DB hits (optional, but good for batch)
- # For simplicity and correctness with concurrent edits, we check DB row by row or handle IntegrityError.
- # But logic requires "Repeated mobile cannot be imported", so check first.
- p = Pinyin()
- for index, row in df_data.iterrows():
- row_num = index + 1 # 1-based row number for display
-
- try:
- with db.begin_nested():
- # 1. Extract Data
- mobile_col = mapping.get("mobile")
- name_col = mapping.get("name")
- en_name_col = mapping.get("english_name")
-
- mobile = str(row[mobile_col]).strip() if mobile_col is not None and pd.notna(row[mobile_col]) else None
- name = str(row[name_col]).strip() if name_col is not None and pd.notna(row[name_col]) else None
- en_name = str(row[en_name_col]).strip() if en_name_col is not None and pd.notna(row[en_name_col]) else None
-
- if not mobile:
- raise ValueError("Mobile is required")
-
- # Validate Mobile Format
- if not re.match(MOBILE_REGEX, mobile):
- raise ValueError(f"Invalid mobile format: {mobile}")
- # 2. Check Mobile Uniqueness
- existing_user = db.query(User).filter(User.mobile == mobile).first()
- if existing_user:
- raise ValueError(f"Mobile {mobile} already exists")
- # 3. Handle Name and English Name Generation
- if name:
- if not en_name:
- # Use surname full pinyin + given name initials
- try:
- en_name = generate_english_name(name)
- except Exception:
- en_name = f"user_{_generate_random_string(6)}"
- else:
- name = f"用户_{_generate_random_string(6)}"
- if not en_name:
- en_name = f"user_{_generate_random_string(6)}"
- final_name = UserImportService._resolve_duplicate_name(db, name)
-
- final_en_name = UserImportService._resolve_duplicate_en_name(db, en_name)
- # 4. Create User
- plain_password = _generate_random_password()
- password_hash = get_password_hash(plain_password)
-
- new_user = User(
- mobile=mobile,
- name=final_name,
- english_name=final_en_name,
- password_hash=password_hash,
- status=UserStatus.ACTIVE,
- role=UserRole.ORDINARY_USER
- )
- db.add(new_user)
- db.flush()
- success_count += 1
- results.append({
- "row": row_num,
- "status": "success",
- "mobile": mobile,
- "name": final_name,
- "english_name": final_en_name,
- "initial_password": plain_password
- })
- except Exception as e:
- fail_count += 1
- results.append({
- "row": row_num,
- "status": "fail",
- "error": str(e)
- })
- # Commit successful rows
- db.commit()
- # Re-calc counts if commit failed totally?
- # Let's refine the loop with begin_nested() for robust partial success.
- # Save Log
- log = ImportLog(
- filename=filename,
- total_count=success_count + fail_count,
- success_count=success_count,
- fail_count=fail_count,
- result_data=results, # Stores passwords! Warning to user about this.
- created_by=user_id
- )
- db.add(log)
- db.commit()
- db.refresh(log)
-
- return log
- @staticmethod
- def _resolve_duplicate_name(db: Session, base_name: str) -> str:
- # Simple heuristic: check base_name, then base_name1, base_name2...
- # Optimization: Query all matching `base_name%` and find gaps?
- # Simpler: Loop.
-
- # Check exact match first
- if not db.query(User).filter(User.name == base_name).first():
- return base_name
-
- i = 1
- while True:
- new_name = f"{base_name}{i}"
- if not db.query(User).filter(User.name == new_name).first():
- return new_name
- i += 1
- @staticmethod
- def _resolve_duplicate_en_name(db: Session, base_name: str) -> str:
- if not db.query(User).filter(User.english_name == base_name).first():
- return base_name
- i = 1
- while True:
- new_name = f"{base_name}{i}"
- if not db.query(User).filter(User.english_name == new_name).first():
- return new_name
- i += 1
|