auth.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. import hmac
  2. import hashlib
  3. import time
  4. from typing import Optional
  5. from urllib.parse import urlparse
  6. import requests
  7. from flask import Blueprint, render_template, request, session, redirect, url_for, flash
  8. from utils.logger_config import logger
  9. auth_bp = Blueprint('auth', __name__)
  10. # 登录账号配置
  11. ADMIN_USERNAME = 'admin'
  12. ADMIN_PASSWORD = 'HNYZ0821'
  13. # 统一登录平台配置
  14. SSO_APP_ID = 'app_aa755b61de0b3da8'
  15. SSO_APP_SECRET = 'ND5bv3WjAc8DwueDSoRXAC04XUyV1X1D'
  16. SSO_LOGIN_URL = 'https://api.hnyunzhu.com/login'
  17. SSO_VALIDATE_URL = 'https://api.hnyunzhu.com/api/v1/simple/validate'
  18. def _generate_signature(secret: str, params: dict) -> str:
  19. """生成 HMAC-SHA256 签名"""
  20. data = {k: v for k, v in params.items() if k != "sign" and v is not None}
  21. sorted_keys = sorted(data.keys())
  22. query_string = "&".join([f"{k}={data[k]}" for k in sorted_keys])
  23. return hmac.new(
  24. secret.encode('utf-8'),
  25. query_string.encode('utf-8'),
  26. hashlib.sha256
  27. ).hexdigest()
  28. def _is_safe_redirect_url(target: str) -> bool:
  29. """检查 next 是否为安全重定向目标(相对路径或同域)"""
  30. if not target or not target.strip():
  31. return False
  32. parsed = urlparse(target)
  33. # 允许相对路径(无 netloc)
  34. if not parsed.netloc:
  35. return target.startswith('/')
  36. # 允许同域
  37. ref = urlparse(request.host_url)
  38. return parsed.netloc == ref.netloc
  39. def _validate_ticket(ticket: str) -> Optional[dict]:
  40. """调用 UAP 验证票据"""
  41. payload = {
  42. "app_id": SSO_APP_ID,
  43. "ticket": ticket,
  44. "timestamp": int(time.time())
  45. }
  46. payload["sign"] = _generate_signature(SSO_APP_SECRET, payload)
  47. try:
  48. resp = requests.post(SSO_VALIDATE_URL, json=payload, timeout=10)
  49. result = resp.json()
  50. logger.info(f"[SSO] validate 请求: url={SSO_VALIDATE_URL}, status={resp.status_code}, response={result}")
  51. return result
  52. except Exception as e:
  53. logger.exception(f"[SSO] validate 请求异常: ticket={ticket[:20]}..., error={e}")
  54. return None
  55. @auth_bp.route('/login', methods=['GET', 'POST'])
  56. def login():
  57. """登录页面"""
  58. if request.method == 'POST':
  59. username = request.form.get('username')
  60. password = request.form.get('password')
  61. if username == ADMIN_USERNAME and password == ADMIN_PASSWORD:
  62. session['logged_in'] = True
  63. # flash('登录成功', 'success') # 登录成功不需要提示,直接进
  64. return redirect(url_for('main.index'))
  65. else:
  66. flash('账号或密码错误', 'error')
  67. return render_template('login.html')
  68. return render_template('login.html')
  69. @auth_bp.route('/login/sso')
  70. def login_sso():
  71. """跳转到统一登录平台"""
  72. return redirect(f"{SSO_LOGIN_URL}?app_id={SSO_APP_ID}")
  73. @auth_bp.route('/callback')
  74. def sso_callback():
  75. """统一登录回调:验证 ticket,valid 通过则登录"""
  76. ticket = request.args.get('ticket')
  77. logger.info(f"[SSO] callback 收到请求: args={dict(request.args)}")
  78. if not ticket:
  79. logger.warning("[SSO] callback 缺少 ticket 参数")
  80. flash('无效的回调', 'error')
  81. return redirect(url_for('auth.login'))
  82. result = _validate_ticket(ticket)
  83. if result and result.get('valid') is True:
  84. logger.info(f"[SSO] 验证通过,登录成功: user_id={result.get('user_id')}, mobile={result.get('mobile')}")
  85. session['logged_in'] = True
  86. next_url = request.args.get('next')
  87. if next_url and _is_safe_redirect_url(next_url):
  88. return redirect(next_url)
  89. return redirect(url_for('main.index'))
  90. else:
  91. logger.warning(f"[SSO] 验证失败: result={result}")
  92. flash('统一登录验证失败', 'error')
  93. return redirect(url_for('auth.login'))
  94. @auth_bp.route('/logout')
  95. def logout():
  96. """登出"""
  97. session.pop('logged_in', None)
  98. flash('已退出登录', 'success')
  99. return redirect(url_for('auth.login'))