import_service.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. import io
  2. import pandas as pd
  3. import random
  4. import string
  5. import re
  6. from xpinyin import Pinyin
  7. from sqlalchemy.orm import Session
  8. from sqlalchemy import or_
  9. from app.models.user import User, UserStatus, UserRole
  10. from app.models.import_log import ImportLog
  11. from app.core.security import get_password_hash
  12. # from app.core.utils import generate_password
  13. from typing import List, Dict, Any, Tuple
  14. from datetime import datetime
  15. # Helper to generate random password if utils doesn't have one
  16. def _generate_random_password(length=8):
  17. chars = string.ascii_letters + string.digits
  18. return ''.join(random.choice(chars) for _ in range(length))
  19. def _generate_random_string(length=6):
  20. chars = string.ascii_lowercase + string.digits
  21. return ''.join(random.choice(chars) for _ in range(length))
  22. MOBILE_REGEX = r'^1[3-9]\d{9}$'
  23. class UserImportService:
  24. @staticmethod
  25. def preview_excel(file_contents: bytes, filename: str) -> Dict[str, Any]:
  26. """
  27. Read first few rows of excel to let user map columns.
  28. """
  29. try:
  30. if filename.endswith('.csv'):
  31. df = pd.read_csv(io.BytesIO(file_contents), header=None, nrows=10)
  32. else:
  33. df = pd.read_excel(io.BytesIO(file_contents), header=None, nrows=10)
  34. # Convert to list of lists (handle NaN)
  35. data = df.fillna("").values.tolist()
  36. # Generate default headers like A, B, C... if no header?
  37. # Or just return data and let frontend handle "Row 1 is header" logic.
  38. # Returning raw data is flexible.
  39. return {
  40. "preview_data": data,
  41. "filename": filename
  42. }
  43. except Exception as e:
  44. raise ValueError(f"Failed to parse file: {str(e)}")
  45. @staticmethod
  46. def process_import(
  47. db: Session,
  48. file_contents: bytes,
  49. filename: str,
  50. start_row: int, # 1-based index from frontend
  51. mapping: Dict[str, int], # Field -> Column Index (0-based)
  52. user_id: int
  53. ) -> ImportLog:
  54. """
  55. Process the Excel file based on mapping.
  56. start_row: The row index where data starts (0-based in pandas logic? Frontend usually gives 1-based).
  57. mapping: {"mobile": 0, "name": 1, "english_name": 2}
  58. """
  59. # Load Data
  60. try:
  61. if filename.endswith('.csv'):
  62. df = pd.read_csv(io.BytesIO(file_contents), header=None)
  63. else:
  64. df = pd.read_excel(io.BytesIO(file_contents), header=None)
  65. except Exception as e:
  66. raise ValueError(f"Failed to read file: {str(e)}")
  67. # Adjust start_row to 0-based
  68. data_start_idx = start_row - 1
  69. if data_start_idx < 0 or data_start_idx >= len(df):
  70. raise ValueError("Invalid start row")
  71. # Slice dataframe
  72. df_data = df.iloc[data_start_idx:]
  73. success_count = 0
  74. fail_count = 0
  75. results = [] # Stores {row: 1, status: success/fail, msg: ..., account: ..., password: ...}
  76. # Cache existing mobiles to minimize DB hits (optional, but good for batch)
  77. # For simplicity and correctness with concurrent edits, we check DB row by row or handle IntegrityError.
  78. # But logic requires "Repeated mobile cannot be imported", so check first.
  79. p = Pinyin()
  80. for index, row in df_data.iterrows():
  81. row_num = index + 1 # 1-based row number for display
  82. try:
  83. with db.begin_nested():
  84. # 1. Extract Data
  85. mobile_col = mapping.get("mobile")
  86. name_col = mapping.get("name")
  87. en_name_col = mapping.get("english_name")
  88. mobile = str(row[mobile_col]).strip() if mobile_col is not None and pd.notna(row[mobile_col]) else None
  89. name = str(row[name_col]).strip() if name_col is not None and pd.notna(row[name_col]) else None
  90. en_name = str(row[en_name_col]).strip() if en_name_col is not None and pd.notna(row[en_name_col]) else None
  91. if not mobile:
  92. raise ValueError("Mobile is required")
  93. # Validate Mobile Format
  94. if not re.match(MOBILE_REGEX, mobile):
  95. raise ValueError(f"Invalid mobile format: {mobile}")
  96. # 2. Check Mobile Uniqueness
  97. existing_user = db.query(User).filter(User.mobile == mobile).first()
  98. if existing_user:
  99. raise ValueError(f"Mobile {mobile} already exists")
  100. # 3. Handle Name and English Name Generation
  101. if name:
  102. if not en_name:
  103. # Use surname first letter + given name full pinyin
  104. try:
  105. surname = name[0]
  106. given = name[1:] if len(name) > 1 else ""
  107. s_initial = p.get_initials(surname, '').lower()
  108. g_pinyin = p.get_pinyin(given, '').lower()
  109. en_name = f"{s_initial}{g_pinyin}"
  110. except Exception:
  111. en_name = f"user_{_generate_random_string(6)}"
  112. else:
  113. name = f"用户_{_generate_random_string(6)}"
  114. if not en_name:
  115. en_name = f"user_{_generate_random_string(6)}"
  116. final_name = UserImportService._resolve_duplicate_name(db, name)
  117. final_en_name = UserImportService._resolve_duplicate_en_name(db, en_name)
  118. # 4. Create User
  119. plain_password = _generate_random_password()
  120. password_hash = get_password_hash(plain_password)
  121. new_user = User(
  122. mobile=mobile,
  123. name=final_name,
  124. english_name=final_en_name,
  125. password_hash=password_hash,
  126. status=UserStatus.ACTIVE,
  127. role=UserRole.ORDINARY_USER
  128. )
  129. db.add(new_user)
  130. db.flush()
  131. success_count += 1
  132. results.append({
  133. "row": row_num,
  134. "status": "success",
  135. "mobile": mobile,
  136. "name": final_name,
  137. "english_name": final_en_name,
  138. "initial_password": plain_password
  139. })
  140. except Exception as e:
  141. fail_count += 1
  142. results.append({
  143. "row": row_num,
  144. "status": "fail",
  145. "error": str(e)
  146. })
  147. # Commit successful rows
  148. db.commit()
  149. # Re-calc counts if commit failed totally?
  150. # Let's refine the loop with begin_nested() for robust partial success.
  151. # Save Log
  152. log = ImportLog(
  153. filename=filename,
  154. total_count=success_count + fail_count,
  155. success_count=success_count,
  156. fail_count=fail_count,
  157. result_data=results, # Stores passwords! Warning to user about this.
  158. created_by=user_id
  159. )
  160. db.add(log)
  161. db.commit()
  162. db.refresh(log)
  163. return log
  164. @staticmethod
  165. def _resolve_duplicate_name(db: Session, base_name: str) -> str:
  166. # Simple heuristic: check base_name, then base_name1, base_name2...
  167. # Optimization: Query all matching `base_name%` and find gaps?
  168. # Simpler: Loop.
  169. # Check exact match first
  170. if not db.query(User).filter(User.name == base_name).first():
  171. return base_name
  172. i = 1
  173. while True:
  174. new_name = f"{base_name}{i}"
  175. if not db.query(User).filter(User.name == new_name).first():
  176. return new_name
  177. i += 1
  178. @staticmethod
  179. def _resolve_duplicate_en_name(db: Session, base_name: str) -> str:
  180. if not db.query(User).filter(User.english_name == base_name).first():
  181. return base_name
  182. i = 1
  183. while True:
  184. new_name = f"{base_name}{i}"
  185. if not db.query(User).filter(User.english_name == new_name).first():
  186. return new_name
  187. i += 1