import_service.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. import io
  2. import pandas as pd
  3. import random
  4. import string
  5. import re
  6. from sqlalchemy.orm import Session
  7. from sqlalchemy import or_
  8. from app.models.user import User, UserStatus, UserRole
  9. from app.models.import_log import ImportLog
  10. from app.core.security import get_password_hash
  11. # from app.core.utils import generate_password
  12. from typing import List, Dict, Any, Tuple
  13. from datetime import datetime
  14. # Helper to generate random password if utils doesn't have one
  15. def _generate_random_password(length=8):
  16. chars = string.ascii_letters + string.digits
  17. return ''.join(random.choice(chars) for _ in range(length))
  18. def _generate_random_string(length=6):
  19. chars = string.ascii_lowercase + string.digits
  20. return ''.join(random.choice(chars) for _ in range(length))
  21. MOBILE_REGEX = r'^1[3-9]\d{9}$'
  22. class UserImportService:
  23. @staticmethod
  24. def preview_excel(file_contents: bytes, filename: str) -> Dict[str, Any]:
  25. """
  26. Read first few rows of excel to let user map columns.
  27. """
  28. try:
  29. if filename.endswith('.csv'):
  30. df = pd.read_csv(io.BytesIO(file_contents), header=None, nrows=10)
  31. else:
  32. df = pd.read_excel(io.BytesIO(file_contents), header=None, nrows=10)
  33. # Convert to list of lists (handle NaN)
  34. data = df.fillna("").values.tolist()
  35. # Generate default headers like A, B, C... if no header?
  36. # Or just return data and let frontend handle "Row 1 is header" logic.
  37. # Returning raw data is flexible.
  38. return {
  39. "preview_data": data,
  40. "filename": filename
  41. }
  42. except Exception as e:
  43. raise ValueError(f"Failed to parse file: {str(e)}")
  44. @staticmethod
  45. def process_import(
  46. db: Session,
  47. file_contents: bytes,
  48. filename: str,
  49. start_row: int, # 1-based index from frontend
  50. mapping: Dict[str, int], # Field -> Column Index (0-based)
  51. user_id: int
  52. ) -> ImportLog:
  53. """
  54. Process the Excel file based on mapping.
  55. start_row: The row index where data starts (0-based in pandas logic? Frontend usually gives 1-based).
  56. mapping: {"mobile": 0, "name": 1, "english_name": 2}
  57. """
  58. # Load Data
  59. try:
  60. if filename.endswith('.csv'):
  61. df = pd.read_csv(io.BytesIO(file_contents), header=None)
  62. else:
  63. df = pd.read_excel(io.BytesIO(file_contents), header=None)
  64. except Exception as e:
  65. raise ValueError(f"Failed to read file: {str(e)}")
  66. # Adjust start_row to 0-based
  67. data_start_idx = start_row - 1
  68. if data_start_idx < 0 or data_start_idx >= len(df):
  69. raise ValueError("Invalid start row")
  70. # Slice dataframe
  71. df_data = df.iloc[data_start_idx:]
  72. success_count = 0
  73. fail_count = 0
  74. results = [] # Stores {row: 1, status: success/fail, msg: ..., account: ..., password: ...}
  75. # Cache existing mobiles to minimize DB hits (optional, but good for batch)
  76. # For simplicity and correctness with concurrent edits, we check DB row by row or handle IntegrityError.
  77. # But logic requires "Repeated mobile cannot be imported", so check first.
  78. for index, row in df_data.iterrows():
  79. row_num = index + 1 # 1-based row number for display
  80. try:
  81. with db.begin_nested():
  82. # 1. Extract Data
  83. mobile_col = mapping.get("mobile")
  84. name_col = mapping.get("name")
  85. en_name_col = mapping.get("english_name")
  86. mobile = str(row[mobile_col]).strip() if mobile_col is not None and pd.notna(row[mobile_col]) else None
  87. name = str(row[name_col]).strip() if name_col is not None and pd.notna(row[name_col]) else None
  88. en_name = str(row[en_name_col]).strip() if en_name_col is not None and pd.notna(row[en_name_col]) else None
  89. if not mobile:
  90. raise ValueError("Mobile is required")
  91. # Validate Mobile Format
  92. if not re.match(MOBILE_REGEX, mobile):
  93. raise ValueError(f"Invalid mobile format: {mobile}")
  94. # 2. Check Mobile Uniqueness
  95. existing_user = db.query(User).filter(User.mobile == mobile).first()
  96. if existing_user:
  97. raise ValueError(f"Mobile {mobile} already exists")
  98. # 3. Handle Name Duplicates (Auto-increment or Random)
  99. if not name:
  100. name = f"用户_{_generate_random_string(6)}"
  101. final_name = UserImportService._resolve_duplicate_name(db, name)
  102. if not en_name:
  103. en_name = f"user_{_generate_random_string(6)}"
  104. final_en_name = UserImportService._resolve_duplicate_en_name(db, en_name)
  105. # 4. Create User
  106. plain_password = _generate_random_password()
  107. password_hash = get_password_hash(plain_password)
  108. new_user = User(
  109. mobile=mobile,
  110. name=final_name,
  111. english_name=final_en_name,
  112. password_hash=password_hash,
  113. status=UserStatus.ACTIVE,
  114. role=UserRole.ORDINARY_USER
  115. )
  116. db.add(new_user)
  117. db.flush()
  118. success_count += 1
  119. results.append({
  120. "row": row_num,
  121. "status": "success",
  122. "mobile": mobile,
  123. "name": final_name,
  124. "english_name": final_en_name,
  125. "initial_password": plain_password
  126. })
  127. except Exception as e:
  128. fail_count += 1
  129. results.append({
  130. "row": row_num,
  131. "status": "fail",
  132. "error": str(e)
  133. })
  134. # Commit successful rows
  135. db.commit()
  136. # Re-calc counts if commit failed totally?
  137. # Let's refine the loop with begin_nested() for robust partial success.
  138. # Save Log
  139. log = ImportLog(
  140. filename=filename,
  141. total_count=success_count + fail_count,
  142. success_count=success_count,
  143. fail_count=fail_count,
  144. result_data=results, # Stores passwords! Warning to user about this.
  145. created_by=user_id
  146. )
  147. db.add(log)
  148. db.commit()
  149. db.refresh(log)
  150. return log
  151. @staticmethod
  152. def _resolve_duplicate_name(db: Session, base_name: str) -> str:
  153. # Simple heuristic: check base_name, then base_name1, base_name2...
  154. # Optimization: Query all matching `base_name%` and find gaps?
  155. # Simpler: Loop.
  156. # Check exact match first
  157. if not db.query(User).filter(User.name == base_name).first():
  158. return base_name
  159. i = 1
  160. while True:
  161. new_name = f"{base_name}{i}"
  162. if not db.query(User).filter(User.name == new_name).first():
  163. return new_name
  164. i += 1
  165. @staticmethod
  166. def _resolve_duplicate_en_name(db: Session, base_name: str) -> str:
  167. if not db.query(User).filter(User.english_name == base_name).first():
  168. return base_name
  169. i = 1
  170. while True:
  171. new_name = f"{base_name}{i}"
  172. if not db.query(User).filter(User.english_name == new_name).first():
  173. return new_name
  174. i += 1