mapping_service.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import io
  2. import pandas as pd
  3. from datetime import datetime
  4. from typing import List, Tuple
  5. from sqlalchemy.orm import Session
  6. from fastapi import HTTPException
  7. from app.models.user import User, UserRole, UserStatus
  8. from app.models.mapping import AppUserMapping
  9. from app.schemas.mapping import (
  10. MappingRowPreview,
  11. MappingRowStatus,
  12. MappingPreviewResponse,
  13. MappingStrategy,
  14. MappingImportSummary,
  15. ImportLogResponse,
  16. ImportLogItem
  17. )
  18. from app.core import security
  19. from app.services.sms_service import SmsService
  20. class MappingService:
  21. @staticmethod
  22. def process_excel_file(file_content: bytes) -> pd.DataFrame:
  23. """
  24. Reads excel file bytes into DataFrame.
  25. Expected columns: 'mobile'/'手机号', 'mapped_key'/'映射账号'
  26. """
  27. try:
  28. # Try reading as Excel first
  29. try:
  30. df = pd.read_excel(io.BytesIO(file_content))
  31. except:
  32. # Fallback to CSV
  33. df = pd.read_csv(io.BytesIO(file_content))
  34. # Normalize headers to support Chinese and English
  35. # Map: 手机号 -> mobile, 映射账号 -> mapped_key, 映射邮箱 -> mapped_email
  36. column_map = {
  37. '手机号': 'mobile',
  38. '映射账号': 'mapped_key',
  39. '映射邮箱': 'mapped_email',
  40. 'user mobile': 'mobile',
  41. 'mapped key': 'mapped_key',
  42. 'mapped email': 'mapped_email'
  43. }
  44. new_columns = []
  45. for col in df.columns:
  46. col_str = str(col).strip()
  47. col_lower = col_str.lower()
  48. if col_str in column_map:
  49. new_columns.append(column_map[col_str])
  50. elif col_lower in column_map:
  51. new_columns.append(column_map[col_lower])
  52. else:
  53. new_columns.append(col_lower)
  54. df.columns = new_columns
  55. return df
  56. except Exception as e:
  57. raise ValueError(f"文件格式无效: {str(e)}")
  58. @staticmethod
  59. def preview_import(db: Session, app_id: int, file_content: bytes, filename: str = "") -> MappingPreviewResponse:
  60. df = MappingService.process_excel_file(file_content)
  61. required_cols = {'mobile', 'mapped_key'}
  62. if not required_cols.issubset(df.columns):
  63. raise ValueError(f"缺少必要列。请确保文件包含: 手机号(mobile), 映射账号(mapped_key)")
  64. preview_rows: List[MappingRowPreview] = []
  65. stats = {"valid": 0, "error": 0, "new": 0, "update": 0}
  66. # 1. Batch query all users involved
  67. mobiles = df['mobile'].astype(str).unique().tolist()
  68. users = db.query(User).filter(User.mobile.in_(mobiles)).all()
  69. user_map = {u.mobile: u.id for u in users}
  70. # 2. Batch query existing mappings for this app
  71. user_ids = list(user_map.values())
  72. existing_mappings = db.query(AppUserMapping).filter(
  73. AppUserMapping.app_id == app_id,
  74. AppUserMapping.user_id.in_(user_ids)
  75. ).all()
  76. mapping_map = {m.user_id: m for m in existing_mappings}
  77. # 3. Iterate rows
  78. for index, row in df.iterrows():
  79. mobile = str(row['mobile']).strip()
  80. mapped_key = str(row['mapped_key']).strip()
  81. mapped_email = None
  82. if 'mapped_email' in df.columns and not pd.isna(row['mapped_email']):
  83. val = str(row['mapped_email']).strip()
  84. mapped_email = val if val else None
  85. row_preview = MappingRowPreview(
  86. row_index=index + 1, # 1-based index for UI
  87. mobile=mobile,
  88. mapped_key=mapped_key,
  89. mapped_email=mapped_email,
  90. status=MappingRowStatus.ERROR
  91. )
  92. if not mobile or not mapped_key or mobile == 'nan' or mapped_key == 'nan':
  93. row_preview.message = "手机号或映射账号为空"
  94. stats["error"] += 1
  95. elif mobile not in user_map:
  96. row_preview.status = MappingRowStatus.AUTO_CREATE_USER
  97. row_preview.message = "用户不存在,将自动创建"
  98. stats["new"] += 1
  99. stats["valid"] += 1
  100. else:
  101. user_id = user_map[mobile]
  102. row_preview.user_id = user_id
  103. if user_id in mapping_map:
  104. row_preview.status = MappingRowStatus.UPDATE
  105. row_preview.message = f"当前映射: {mapping_map[user_id].mapped_key}"
  106. stats["update"] += 1
  107. stats["valid"] += 1
  108. else:
  109. row_preview.status = MappingRowStatus.NEW
  110. stats["new"] += 1
  111. stats["valid"] += 1
  112. preview_rows.append(row_preview)
  113. return MappingPreviewResponse(
  114. total_rows=len(df),
  115. valid_count=stats["valid"],
  116. error_count=stats["error"],
  117. new_count=stats["new"],
  118. update_count=stats["update"],
  119. preview_rows=preview_rows
  120. )
  121. @staticmethod
  122. def execute_import(
  123. db: Session,
  124. app_id: int,
  125. file_content: bytes,
  126. filename: str,
  127. strategy: MappingStrategy,
  128. current_user_mobile: str,
  129. verification_code: str
  130. ) -> ImportLogResponse:
  131. # 0. Verify SMS Code
  132. if not SmsService.verify_code(current_user_mobile, verification_code):
  133. raise HTTPException(status_code=400, detail="验证码无效或已过期")
  134. preview = MappingService.preview_import(db, app_id, file_content, filename)
  135. summary = MappingImportSummary(total_processed=0, inserted=0, updated=0, failed=0)
  136. logs: List[ImportLogItem] = []
  137. import_time = datetime.now()
  138. for row in preview.preview_rows:
  139. summary.total_processed += 1
  140. log_item = ImportLogItem(
  141. import_time=import_time,
  142. mobile=row.mobile,
  143. mapped_key=row.mapped_key,
  144. mapped_email=row.mapped_email,
  145. status="Failed",
  146. message="",
  147. is_unified_account_created=False
  148. )
  149. if row.status == MappingRowStatus.ERROR:
  150. summary.failed += 1
  151. log_item.status = "Failed"
  152. log_item.message = row.message or "数据错误"
  153. logs.append(log_item)
  154. continue
  155. try:
  156. if row.status == MappingRowStatus.NEW or row.status == MappingRowStatus.AUTO_CREATE_USER:
  157. user_id = row.user_id
  158. if row.status == MappingRowStatus.AUTO_CREATE_USER:
  159. # Create User
  160. pwd = security.generate_alphanumeric_password(8)
  161. new_user = User(
  162. mobile=row.mobile,
  163. password_hash=security.get_password_hash(pwd),
  164. status=UserStatus.ACTIVE,
  165. role=UserRole.ORDINARY_USER
  166. )
  167. db.add(new_user)
  168. db.flush() # flush to get ID
  169. user_id = new_user.id
  170. log_item.is_unified_account_created = True
  171. log_item.generated_password = pwd
  172. # Check uniqueness before insert (Concurrency edge case or just double check)
  173. # We rely on unique constraint but catching it is cleaner
  174. new_mapping = AppUserMapping(
  175. app_id=app_id,
  176. user_id=user_id,
  177. mapped_key=row.mapped_key,
  178. mapped_email=row.mapped_email
  179. )
  180. db.add(new_mapping)
  181. summary.inserted += 1
  182. log_item.status = "Success"
  183. log_item.message = "映射创建成功"
  184. elif row.status == MappingRowStatus.UPDATE:
  185. if strategy == MappingStrategy.OVERWRITE:
  186. # Find and update
  187. existing = db.query(AppUserMapping).filter(
  188. AppUserMapping.app_id == app_id,
  189. AppUserMapping.user_id == row.user_id
  190. ).first()
  191. if existing:
  192. existing.mapped_key = row.mapped_key
  193. existing.mapped_email = row.mapped_email
  194. db.add(existing)
  195. summary.updated += 1
  196. log_item.status = "Success"
  197. log_item.message = "映射更新成功"
  198. else:
  199. # SKIP
  200. log_item.status = "Skipped"
  201. log_item.message = "映射已存在 (已跳过)"
  202. except Exception as e:
  203. summary.failed += 1
  204. log_item.status = "Failed"
  205. log_item.message = str(e)
  206. logs.append(log_item)
  207. db.commit()
  208. return ImportLogResponse(summary=summary, logs=logs)