import_service.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. import io
  2. import pandas as pd
  3. import random
  4. import string
  5. import re
  6. from xpinyin import Pinyin
  7. from sqlalchemy.orm import Session
  8. from sqlalchemy import or_
  9. from app.models.user import User, UserStatus, UserRole
  10. from app.models.import_log import ImportLog
  11. from app.core.security import get_password_hash
  12. from app.core.utils import generate_english_name
  13. # from app.core.utils import generate_password
  14. from typing import List, Dict, Any, Tuple
  15. from datetime import datetime
  16. # Helper to generate random password if utils doesn't have one
  17. def _generate_random_password(length=8):
  18. chars = string.ascii_letters + string.digits
  19. return ''.join(random.choice(chars) for _ in range(length))
  20. def _generate_random_string(length=6):
  21. chars = string.ascii_lowercase + string.digits
  22. return ''.join(random.choice(chars) for _ in range(length))
  23. MOBILE_REGEX = r'^1[3-9]\d{9}$'
  24. class UserImportService:
  25. @staticmethod
  26. def preview_excel(file_contents: bytes, filename: str) -> Dict[str, Any]:
  27. """
  28. Read first few rows of excel to let user map columns.
  29. """
  30. try:
  31. if filename.endswith('.csv'):
  32. df = pd.read_csv(io.BytesIO(file_contents), header=None, nrows=10)
  33. else:
  34. df = pd.read_excel(io.BytesIO(file_contents), header=None, nrows=10)
  35. # Convert to list of lists (handle NaN)
  36. data = df.fillna("").values.tolist()
  37. # Generate default headers like A, B, C... if no header?
  38. # Or just return data and let frontend handle "Row 1 is header" logic.
  39. # Returning raw data is flexible.
  40. return {
  41. "preview_data": data,
  42. "filename": filename
  43. }
  44. except Exception as e:
  45. raise ValueError(f"Failed to parse file: {str(e)}")
  46. @staticmethod
  47. def process_import(
  48. db: Session,
  49. file_contents: bytes,
  50. filename: str,
  51. start_row: int, # 1-based index from frontend
  52. mapping: Dict[str, int], # Field -> Column Index (0-based)
  53. user_id: int
  54. ) -> ImportLog:
  55. """
  56. Process the Excel file based on mapping.
  57. start_row: The row index where data starts (0-based in pandas logic? Frontend usually gives 1-based).
  58. mapping: {"mobile": 0, "name": 1, "english_name": 2}
  59. """
  60. # Load Data
  61. try:
  62. if filename.endswith('.csv'):
  63. df = pd.read_csv(io.BytesIO(file_contents), header=None)
  64. else:
  65. df = pd.read_excel(io.BytesIO(file_contents), header=None)
  66. except Exception as e:
  67. raise ValueError(f"Failed to read file: {str(e)}")
  68. # Adjust start_row to 0-based
  69. data_start_idx = start_row - 1
  70. if data_start_idx < 0 or data_start_idx >= len(df):
  71. raise ValueError("Invalid start row")
  72. # Slice dataframe
  73. df_data = df.iloc[data_start_idx:]
  74. success_count = 0
  75. fail_count = 0
  76. results = [] # Stores {row: 1, status: success/fail, msg: ..., account: ..., password: ...}
  77. # Cache existing mobiles to minimize DB hits (optional, but good for batch)
  78. # For simplicity and correctness with concurrent edits, we check DB row by row or handle IntegrityError.
  79. # But logic requires "Repeated mobile cannot be imported", so check first.
  80. p = Pinyin()
  81. for index, row in df_data.iterrows():
  82. row_num = index + 1 # 1-based row number for display
  83. # Extract Data first to check for empty rows
  84. mobile_col = mapping.get("mobile")
  85. name_col = mapping.get("name")
  86. en_name_col = mapping.get("english_name")
  87. mobile = str(row[mobile_col]).strip() if mobile_col is not None and pd.notna(row[mobile_col]) else None
  88. name = str(row[name_col]).strip() if name_col is not None and pd.notna(row[name_col]) else None
  89. if name:
  90. name = name.replace(" ", "")
  91. en_name = str(row[en_name_col]).strip() if en_name_col is not None and pd.notna(row[en_name_col]) else None
  92. # Skip empty rows (often generated by Excel at the end)
  93. if not mobile and not name and not en_name:
  94. continue
  95. try:
  96. with db.begin_nested():
  97. if not mobile:
  98. raise ValueError("Mobile is required")
  99. # Validate Mobile Format
  100. if not re.match(MOBILE_REGEX, mobile):
  101. raise ValueError(f"Invalid mobile format: {mobile}")
  102. # 2. Check Mobile Uniqueness
  103. existing_user = db.query(User).filter(User.mobile == mobile).first()
  104. if existing_user:
  105. raise ValueError(f"Mobile {mobile} already exists")
  106. # 3. Handle Name and English Name Generation
  107. if name:
  108. if not en_name:
  109. # Use surname full pinyin + given name initials
  110. try:
  111. en_name = generate_english_name(name)
  112. except Exception:
  113. en_name = f"user_{_generate_random_string(6)}"
  114. else:
  115. name = f"用户_{_generate_random_string(6)}"
  116. if not en_name:
  117. en_name = f"user_{_generate_random_string(6)}"
  118. final_name = UserImportService._resolve_duplicate_name(db, name)
  119. final_en_name = UserImportService._resolve_duplicate_en_name(db, en_name)
  120. # 4. Create User
  121. plain_password = _generate_random_password()
  122. password_hash = get_password_hash(plain_password)
  123. new_user = User(
  124. mobile=mobile,
  125. name=final_name,
  126. english_name=final_en_name,
  127. password_hash=password_hash,
  128. status=UserStatus.ACTIVE,
  129. role=UserRole.ORDINARY_USER
  130. )
  131. db.add(new_user)
  132. db.flush()
  133. success_count += 1
  134. results.append({
  135. "row": row_num,
  136. "status": "success",
  137. "mobile": mobile,
  138. "name": final_name,
  139. "english_name": final_en_name,
  140. "initial_password": plain_password
  141. })
  142. except Exception as e:
  143. fail_count += 1
  144. results.append({
  145. "row": row_num,
  146. "status": "fail",
  147. "error": str(e)
  148. })
  149. # Commit successful rows
  150. db.commit()
  151. # Re-calc counts if commit failed totally?
  152. # Let's refine the loop with begin_nested() for robust partial success.
  153. # Save Log
  154. log = ImportLog(
  155. filename=filename,
  156. total_count=success_count + fail_count,
  157. success_count=success_count,
  158. fail_count=fail_count,
  159. result_data=results, # Stores passwords! Warning to user about this.
  160. created_by=user_id
  161. )
  162. db.add(log)
  163. db.commit()
  164. db.refresh(log)
  165. return log
  166. @staticmethod
  167. def _resolve_duplicate_name(db: Session, base_name: str) -> str:
  168. # Simple heuristic: check base_name, then base_name1, base_name2...
  169. # Optimization: Query all matching `base_name%` and find gaps?
  170. # Simpler: Loop.
  171. # Check exact match first
  172. if not db.query(User).filter(User.name == base_name).first():
  173. return base_name
  174. i = 1
  175. while True:
  176. new_name = f"{base_name}{i}"
  177. if not db.query(User).filter(User.name == new_name).first():
  178. return new_name
  179. i += 1
  180. @staticmethod
  181. def _resolve_duplicate_en_name(db: Session, base_name: str) -> str:
  182. if not db.query(User).filter(User.english_name == base_name).first():
  183. return base_name
  184. i = 1
  185. while True:
  186. new_name = f"{base_name}{i}"
  187. if not db.query(User).filter(User.english_name == new_name).first():
  188. return new_name
  189. i += 1