mapping_service.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. import io
  2. import pandas as pd
  3. import logging
  4. from datetime import datetime
  5. from typing import List, Tuple
  6. from sqlalchemy.orm import Session
  7. from fastapi import HTTPException
  8. from app.models.user import User, UserRole, UserStatus
  9. from app.models.mapping import AppUserMapping
  10. from app.schemas.mapping import (
  11. MappingRowPreview,
  12. MappingRowStatus,
  13. MappingPreviewResponse,
  14. MappingStrategy,
  15. MappingImportSummary,
  16. ImportLogResponse,
  17. ImportLogItem
  18. )
  19. from app.core import security
  20. from app.services.sms_service import SmsService
  21. logger = logging.getLogger(__name__)
  22. class MappingService:
  23. @staticmethod
  24. def process_excel_file(file_content: bytes) -> pd.DataFrame:
  25. """
  26. Reads excel file bytes into DataFrame.
  27. Expected columns: 'mobile'/'手机号', 'mapped_key'/'映射账号'
  28. """
  29. try:
  30. # Try reading as Excel first
  31. try:
  32. df = pd.read_excel(io.BytesIO(file_content))
  33. except:
  34. # Fallback to CSV
  35. df = pd.read_csv(io.BytesIO(file_content))
  36. # Normalize headers to support Chinese and English
  37. # Map: 手机号 -> mobile, 映射账号 -> mapped_key, 映射邮箱 -> mapped_email
  38. column_map = {
  39. '手机号': 'mobile',
  40. '映射账号': 'mapped_key',
  41. '映射邮箱': 'mapped_email',
  42. 'user mobile': 'mobile',
  43. 'mapped key': 'mapped_key',
  44. 'mapped email': 'mapped_email'
  45. }
  46. new_columns = []
  47. for col in df.columns:
  48. col_str = str(col).strip()
  49. col_lower = col_str.lower()
  50. if col_str in column_map:
  51. new_columns.append(column_map[col_str])
  52. elif col_lower in column_map:
  53. new_columns.append(column_map[col_lower])
  54. else:
  55. new_columns.append(col_lower)
  56. df.columns = new_columns
  57. return df
  58. except Exception as e:
  59. logger.error(f"文件解析失败: {e}", exc_info=True)
  60. raise ValueError(f"文件格式无效: {str(e)}")
  61. @staticmethod
  62. def preview_import(db: Session, app_id: int, file_content: bytes, filename: str = "") -> MappingPreviewResponse:
  63. try:
  64. df = MappingService.process_excel_file(file_content)
  65. required_cols = {'mobile', 'mapped_key'}
  66. if not required_cols.issubset(df.columns):
  67. raise ValueError(f"缺少必要列。请确保文件包含: 手机号(mobile), 映射账号(mapped_key)")
  68. preview_rows: List[MappingRowPreview] = []
  69. stats = {"valid": 0, "error": 0, "new": 0, "update": 0}
  70. # 1. Batch query all users involved
  71. mobiles = df['mobile'].astype(str).unique().tolist()
  72. users = db.query(User).filter(User.mobile.in_(mobiles)).all()
  73. user_map = {u.mobile: u.id for u in users}
  74. # 2. Batch query existing mappings for this app
  75. user_ids = list(user_map.values())
  76. existing_mappings = db.query(AppUserMapping).filter(
  77. AppUserMapping.app_id == app_id,
  78. AppUserMapping.user_id.in_(user_ids)
  79. ).all()
  80. mapping_map = {m.user_id: m for m in existing_mappings}
  81. # 3. Iterate rows
  82. for index, row in df.iterrows():
  83. mobile = str(row['mobile']).strip()
  84. mapped_key = str(row['mapped_key']).strip()
  85. mapped_email = None
  86. if 'mapped_email' in df.columns and not pd.isna(row['mapped_email']):
  87. val = str(row['mapped_email']).strip()
  88. mapped_email = val if val else None
  89. row_preview = MappingRowPreview(
  90. row_index=index + 1, # 1-based index for UI
  91. mobile=mobile,
  92. mapped_key=mapped_key,
  93. mapped_email=mapped_email,
  94. status=MappingRowStatus.ERROR
  95. )
  96. if not mobile or not mapped_key or mobile == 'nan' or mapped_key == 'nan':
  97. row_preview.message = "手机号或映射账号为空"
  98. stats["error"] += 1
  99. elif mobile not in user_map:
  100. row_preview.status = MappingRowStatus.AUTO_CREATE_USER
  101. row_preview.message = "用户不存在,将自动创建"
  102. stats["new"] += 1
  103. stats["valid"] += 1
  104. else:
  105. user_id = user_map[mobile]
  106. row_preview.user_id = user_id
  107. if user_id in mapping_map:
  108. row_preview.status = MappingRowStatus.UPDATE
  109. row_preview.message = f"当前映射: {mapping_map[user_id].mapped_key}"
  110. stats["update"] += 1
  111. stats["valid"] += 1
  112. else:
  113. row_preview.status = MappingRowStatus.NEW
  114. stats["new"] += 1
  115. stats["valid"] += 1
  116. preview_rows.append(row_preview)
  117. logger.info(f"导入预览完成: App {app_id}, 文件 {filename}, 行数 {len(df)}")
  118. return MappingPreviewResponse(
  119. total_rows=len(df),
  120. valid_count=stats["valid"],
  121. error_count=stats["error"],
  122. new_count=stats["new"],
  123. update_count=stats["update"],
  124. preview_rows=preview_rows
  125. )
  126. except Exception as e:
  127. logger.error(f"导入预览异常: {e}", exc_info=True)
  128. raise
  129. @staticmethod
  130. def execute_import(
  131. db: Session,
  132. app_id: int,
  133. file_content: bytes,
  134. filename: str,
  135. strategy: MappingStrategy,
  136. current_user_mobile: str,
  137. verification_code: str
  138. ) -> ImportLogResponse:
  139. # 0. Verify SMS Code
  140. if not SmsService.verify_code(current_user_mobile, verification_code):
  141. logger.warning(f"导入执行失败: 验证码错误 (User: {current_user_mobile})")
  142. raise HTTPException(status_code=400, detail="验证码无效或已过期")
  143. preview = MappingService.preview_import(db, app_id, file_content, filename)
  144. summary = MappingImportSummary(total_processed=0, inserted=0, updated=0, failed=0)
  145. logs: List[ImportLogItem] = []
  146. import_time = datetime.now()
  147. for row in preview.preview_rows:
  148. summary.total_processed += 1
  149. log_item = ImportLogItem(
  150. import_time=import_time,
  151. mobile=row.mobile,
  152. mapped_key=row.mapped_key,
  153. mapped_email=row.mapped_email,
  154. status="Failed",
  155. message="",
  156. is_unified_account_created=False
  157. )
  158. if row.status == MappingRowStatus.ERROR:
  159. summary.failed += 1
  160. log_item.status = "Failed"
  161. log_item.message = row.message or "数据错误"
  162. logs.append(log_item)
  163. continue
  164. try:
  165. if row.status == MappingRowStatus.NEW or row.status == MappingRowStatus.AUTO_CREATE_USER:
  166. user_id = row.user_id
  167. if row.status == MappingRowStatus.AUTO_CREATE_USER:
  168. # Create User
  169. pwd = security.generate_alphanumeric_password(8)
  170. # Generate random suffix for name
  171. random_suffix = security.generate_alphanumeric_password(6)
  172. new_user = User(
  173. mobile=row.mobile,
  174. password_hash=security.get_password_hash(pwd),
  175. status=UserStatus.ACTIVE,
  176. role=UserRole.ORDINARY_USER,
  177. name=f"用户{random_suffix}",
  178. english_name=row.mapped_key
  179. )
  180. db.add(new_user)
  181. db.flush() # flush to get ID
  182. user_id = new_user.id
  183. log_item.is_unified_account_created = True
  184. log_item.generated_password = pwd
  185. # Check uniqueness before insert (Concurrency edge case or just double check)
  186. # We rely on unique constraint but catching it is cleaner
  187. new_mapping = AppUserMapping(
  188. app_id=app_id,
  189. user_id=user_id,
  190. mapped_key=row.mapped_key,
  191. mapped_email=row.mapped_email
  192. )
  193. db.add(new_mapping)
  194. summary.inserted += 1
  195. log_item.status = "Success"
  196. log_item.message = "映射创建成功"
  197. elif row.status == MappingRowStatus.UPDATE:
  198. if strategy == MappingStrategy.OVERWRITE:
  199. # Find and update
  200. existing = db.query(AppUserMapping).filter(
  201. AppUserMapping.app_id == app_id,
  202. AppUserMapping.user_id == row.user_id
  203. ).first()
  204. if existing:
  205. existing.mapped_key = row.mapped_key
  206. existing.mapped_email = row.mapped_email
  207. db.add(existing)
  208. summary.updated += 1
  209. log_item.status = "Success"
  210. log_item.message = "映射更新成功"
  211. else:
  212. # SKIP
  213. log_item.status = "Skipped"
  214. log_item.message = "映射已存在 (已跳过)"
  215. except Exception as e:
  216. summary.failed += 1
  217. log_item.status = "Failed"
  218. log_item.message = str(e)
  219. logger.error(f"导入行失败 (Mobile: {row.mobile}): {e}")
  220. logs.append(log_item)
  221. db.commit()
  222. return ImportLogResponse(summary=summary, logs=logs)