import_service.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. import io
  2. import pandas as pd
  3. import random
  4. import string
  5. import re
  6. from xpinyin import Pinyin
  7. from sqlalchemy.orm import Session
  8. from sqlalchemy import or_
  9. from app.models.user import User, UserStatus, UserRole
  10. from app.models.import_log import ImportLog
  11. from app.core.security import get_password_hash
  12. from app.core.utils import generate_english_name
  13. # from app.core.utils import generate_password
  14. from typing import List, Dict, Any, Tuple
  15. from datetime import datetime
  16. # Helper to generate random password if utils doesn't have one
  17. def _generate_random_password(length=8):
  18. chars = string.ascii_letters + string.digits
  19. return ''.join(random.choice(chars) for _ in range(length))
  20. def _generate_random_string(length=6):
  21. chars = string.ascii_lowercase + string.digits
  22. return ''.join(random.choice(chars) for _ in range(length))
  23. MOBILE_REGEX = r'^1[3-9]\d{9}$'
  24. class UserImportService:
  25. @staticmethod
  26. def preview_excel(file_contents: bytes, filename: str) -> Dict[str, Any]:
  27. """
  28. Read first few rows of excel to let user map columns.
  29. """
  30. try:
  31. if filename.endswith('.csv'):
  32. df = pd.read_csv(io.BytesIO(file_contents), header=None, nrows=10)
  33. else:
  34. df = pd.read_excel(io.BytesIO(file_contents), header=None, nrows=10)
  35. # Convert to list of lists (handle NaN)
  36. data = df.fillna("").values.tolist()
  37. # Generate default headers like A, B, C... if no header?
  38. # Or just return data and let frontend handle "Row 1 is header" logic.
  39. # Returning raw data is flexible.
  40. return {
  41. "preview_data": data,
  42. "filename": filename
  43. }
  44. except Exception as e:
  45. raise ValueError(f"Failed to parse file: {str(e)}")
  46. @staticmethod
  47. def process_import(
  48. db: Session,
  49. file_contents: bytes,
  50. filename: str,
  51. start_row: int, # 1-based index from frontend
  52. mapping: Dict[str, int], # Field -> Column Index (0-based)
  53. user_id: int
  54. ) -> ImportLog:
  55. """
  56. Process the Excel file based on mapping.
  57. start_row: The row index where data starts (0-based in pandas logic? Frontend usually gives 1-based).
  58. mapping: {"mobile": 0, "name": 1, "english_name": 2}
  59. """
  60. # Load Data
  61. try:
  62. if filename.endswith('.csv'):
  63. df = pd.read_csv(io.BytesIO(file_contents), header=None)
  64. else:
  65. df = pd.read_excel(io.BytesIO(file_contents), header=None)
  66. except Exception as e:
  67. raise ValueError(f"Failed to read file: {str(e)}")
  68. # Adjust start_row to 0-based
  69. data_start_idx = start_row - 1
  70. if data_start_idx < 0 or data_start_idx >= len(df):
  71. raise ValueError("Invalid start row")
  72. # Slice dataframe
  73. df_data = df.iloc[data_start_idx:]
  74. success_count = 0
  75. fail_count = 0
  76. results = [] # Stores {row: 1, status: success/fail, msg: ..., account: ..., password: ...}
  77. # Cache existing mobiles to minimize DB hits (optional, but good for batch)
  78. # For simplicity and correctness with concurrent edits, we check DB row by row or handle IntegrityError.
  79. # But logic requires "Repeated mobile cannot be imported", so check first.
  80. p = Pinyin()
  81. for index, row in df_data.iterrows():
  82. row_num = index + 1 # 1-based row number for display
  83. try:
  84. with db.begin_nested():
  85. # 1. Extract Data
  86. mobile_col = mapping.get("mobile")
  87. name_col = mapping.get("name")
  88. en_name_col = mapping.get("english_name")
  89. mobile = str(row[mobile_col]).strip() if mobile_col is not None and pd.notna(row[mobile_col]) else None
  90. name = str(row[name_col]).strip() if name_col is not None and pd.notna(row[name_col]) else None
  91. en_name = str(row[en_name_col]).strip() if en_name_col is not None and pd.notna(row[en_name_col]) else None
  92. if not mobile:
  93. raise ValueError("Mobile is required")
  94. # Validate Mobile Format
  95. if not re.match(MOBILE_REGEX, mobile):
  96. raise ValueError(f"Invalid mobile format: {mobile}")
  97. # 2. Check Mobile Uniqueness
  98. existing_user = db.query(User).filter(User.mobile == mobile).first()
  99. if existing_user:
  100. raise ValueError(f"Mobile {mobile} already exists")
  101. # 3. Handle Name and English Name Generation
  102. if name:
  103. if not en_name:
  104. # Use surname full pinyin + given name initials
  105. try:
  106. en_name = generate_english_name(name)
  107. except Exception:
  108. en_name = f"user_{_generate_random_string(6)}"
  109. else:
  110. name = f"用户_{_generate_random_string(6)}"
  111. if not en_name:
  112. en_name = f"user_{_generate_random_string(6)}"
  113. final_name = UserImportService._resolve_duplicate_name(db, name)
  114. final_en_name = UserImportService._resolve_duplicate_en_name(db, en_name)
  115. # 4. Create User
  116. plain_password = _generate_random_password()
  117. password_hash = get_password_hash(plain_password)
  118. new_user = User(
  119. mobile=mobile,
  120. name=final_name,
  121. english_name=final_en_name,
  122. password_hash=password_hash,
  123. status=UserStatus.ACTIVE,
  124. role=UserRole.ORDINARY_USER
  125. )
  126. db.add(new_user)
  127. db.flush()
  128. success_count += 1
  129. results.append({
  130. "row": row_num,
  131. "status": "success",
  132. "mobile": mobile,
  133. "name": final_name,
  134. "english_name": final_en_name,
  135. "initial_password": plain_password
  136. })
  137. except Exception as e:
  138. fail_count += 1
  139. results.append({
  140. "row": row_num,
  141. "status": "fail",
  142. "error": str(e)
  143. })
  144. # Commit successful rows
  145. db.commit()
  146. # Re-calc counts if commit failed totally?
  147. # Let's refine the loop with begin_nested() for robust partial success.
  148. # Save Log
  149. log = ImportLog(
  150. filename=filename,
  151. total_count=success_count + fail_count,
  152. success_count=success_count,
  153. fail_count=fail_count,
  154. result_data=results, # Stores passwords! Warning to user about this.
  155. created_by=user_id
  156. )
  157. db.add(log)
  158. db.commit()
  159. db.refresh(log)
  160. return log
  161. @staticmethod
  162. def _resolve_duplicate_name(db: Session, base_name: str) -> str:
  163. # Simple heuristic: check base_name, then base_name1, base_name2...
  164. # Optimization: Query all matching `base_name%` and find gaps?
  165. # Simpler: Loop.
  166. # Check exact match first
  167. if not db.query(User).filter(User.name == base_name).first():
  168. return base_name
  169. i = 1
  170. while True:
  171. new_name = f"{base_name}{i}"
  172. if not db.query(User).filter(User.name == new_name).first():
  173. return new_name
  174. i += 1
  175. @staticmethod
  176. def _resolve_duplicate_en_name(db: Session, base_name: str) -> str:
  177. if not db.query(User).filter(User.english_name == base_name).first():
  178. return base_name
  179. i = 1
  180. while True:
  181. new_name = f"{base_name}{i}"
  182. if not db.query(User).filter(User.english_name == new_name).first():
  183. return new_name
  184. i += 1