auth.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. import hmac
  2. import hashlib
  3. import time
  4. from typing import Optional
  5. import requests
  6. from flask import Blueprint, render_template, request, session, redirect, url_for, flash
  7. from utils.logger_config import logger
  8. auth_bp = Blueprint('auth', __name__)
  9. # 登录账号配置
  10. ADMIN_USERNAME = 'admin'
  11. ADMIN_PASSWORD = 'HNYZ0821'
  12. # 统一登录平台配置
  13. SSO_APP_ID = 'app_aa755b61de0b3da8'
  14. SSO_APP_SECRET = 'ND5bv3WjAc8DwueDSoRXAC04XUyV1X1D'
  15. SSO_LOGIN_URL = 'https://api.hnyunzhu.com/login'
  16. SSO_VALIDATE_URL = 'https://api.hnyunzhu.com/api/v1/simple/validate'
  17. def _generate_signature(secret: str, params: dict) -> str:
  18. """生成 HMAC-SHA256 签名"""
  19. data = {k: v for k, v in params.items() if k != "sign" and v is not None}
  20. sorted_keys = sorted(data.keys())
  21. query_string = "&".join([f"{k}={data[k]}" for k in sorted_keys])
  22. return hmac.new(
  23. secret.encode('utf-8'),
  24. query_string.encode('utf-8'),
  25. hashlib.sha256
  26. ).hexdigest()
  27. def _validate_ticket(ticket: str) -> Optional[dict]:
  28. """调用 UAP 验证票据"""
  29. payload = {
  30. "app_id": SSO_APP_ID,
  31. "ticket": ticket,
  32. "timestamp": int(time.time())
  33. }
  34. payload["sign"] = _generate_signature(SSO_APP_SECRET, payload)
  35. try:
  36. resp = requests.post(SSO_VALIDATE_URL, json=payload, timeout=10)
  37. result = resp.json()
  38. logger.info(f"[SSO] validate 请求: url={SSO_VALIDATE_URL}, status={resp.status_code}, response={result}")
  39. return result
  40. except Exception as e:
  41. logger.exception(f"[SSO] validate 请求异常: ticket={ticket[:20]}..., error={e}")
  42. return None
  43. @auth_bp.route('/login', methods=['GET', 'POST'])
  44. def login():
  45. """登录页面"""
  46. if request.method == 'POST':
  47. username = request.form.get('username')
  48. password = request.form.get('password')
  49. if username == ADMIN_USERNAME and password == ADMIN_PASSWORD:
  50. session['logged_in'] = True
  51. # flash('登录成功', 'success') # 登录成功不需要提示,直接进
  52. return redirect(url_for('main.index'))
  53. else:
  54. flash('账号或密码错误', 'error')
  55. return render_template('login.html')
  56. return render_template('login.html')
  57. @auth_bp.route('/login/sso')
  58. def login_sso():
  59. """跳转到统一登录平台"""
  60. return redirect(f"{SSO_LOGIN_URL}?app_id={SSO_APP_ID}")
  61. @auth_bp.route('/callback')
  62. def sso_callback():
  63. """统一登录回调:验证 ticket,valid 通过则登录"""
  64. ticket = request.args.get('ticket')
  65. logger.info(f"[SSO] callback 收到请求: args={dict(request.args)}")
  66. if not ticket:
  67. logger.warning("[SSO] callback 缺少 ticket 参数")
  68. flash('无效的回调', 'error')
  69. return redirect(url_for('auth.login'))
  70. result = _validate_ticket(ticket)
  71. if result and result.get('valid') is True:
  72. logger.info(f"[SSO] 验证通过,登录成功: user_id={result.get('user_id')}, mobile={result.get('mobile')}")
  73. session['logged_in'] = True
  74. return redirect(url_for('main.index'))
  75. else:
  76. logger.warning(f"[SSO] 验证失败: result={result}")
  77. flash('统一登录验证失败', 'error')
  78. return redirect(url_for('auth.login'))
  79. @auth_bp.route('/logout')
  80. def logout():
  81. """登出"""
  82. session.pop('logged_in', None)
  83. flash('已退出登录', 'success')
  84. return redirect(url_for('auth.login'))