| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- import hmac
- import hashlib
- import time
- from typing import Optional
- import requests
- from flask import Blueprint, render_template, request, session, redirect, url_for, flash
- from utils.logger_config import logger
- auth_bp = Blueprint('auth', __name__)
- # 登录账号配置
- ADMIN_USERNAME = 'admin'
- ADMIN_PASSWORD = 'HNYZ0821'
- # 统一登录平台配置
- SSO_APP_ID = 'app_aa755b61de0b3da8'
- SSO_APP_SECRET = 'ND5bv3WjAc8DwueDSoRXAC04XUyV1X1D'
- SSO_LOGIN_URL = 'https://api.hnyunzhu.com/login'
- SSO_VALIDATE_URL = 'https://api.hnyunzhu.com/api/v1/simple/validate'
- def _generate_signature(secret: str, params: dict) -> str:
- """生成 HMAC-SHA256 签名"""
- data = {k: v for k, v in params.items() if k != "sign" and v is not None}
- sorted_keys = sorted(data.keys())
- query_string = "&".join([f"{k}={data[k]}" for k in sorted_keys])
- return hmac.new(
- secret.encode('utf-8'),
- query_string.encode('utf-8'),
- hashlib.sha256
- ).hexdigest()
- def _validate_ticket(ticket: str) -> Optional[dict]:
- """调用 UAP 验证票据"""
- payload = {
- "app_id": SSO_APP_ID,
- "ticket": ticket,
- "timestamp": int(time.time())
- }
- payload["sign"] = _generate_signature(SSO_APP_SECRET, payload)
- try:
- resp = requests.post(SSO_VALIDATE_URL, json=payload, timeout=10)
- result = resp.json()
- logger.info(f"[SSO] validate 请求: url={SSO_VALIDATE_URL}, status={resp.status_code}, response={result}")
- return result
- except Exception as e:
- logger.exception(f"[SSO] validate 请求异常: ticket={ticket[:20]}..., error={e}")
- return None
- @auth_bp.route('/login', methods=['GET', 'POST'])
- def login():
- """登录页面"""
- if request.method == 'POST':
- username = request.form.get('username')
- password = request.form.get('password')
-
- if username == ADMIN_USERNAME and password == ADMIN_PASSWORD:
- session['logged_in'] = True
- # flash('登录成功', 'success') # 登录成功不需要提示,直接进
- return redirect(url_for('main.index'))
- else:
- flash('账号或密码错误', 'error')
- return render_template('login.html')
-
- return render_template('login.html')
- @auth_bp.route('/login/sso')
- def login_sso():
- """跳转到统一登录平台"""
- return redirect(f"{SSO_LOGIN_URL}?app_id={SSO_APP_ID}")
- @auth_bp.route('/callback')
- def sso_callback():
- """统一登录回调:验证 ticket,valid 通过则登录"""
- ticket = request.args.get('ticket')
- logger.info(f"[SSO] callback 收到请求: args={dict(request.args)}")
- if not ticket:
- logger.warning("[SSO] callback 缺少 ticket 参数")
- flash('无效的回调', 'error')
- return redirect(url_for('auth.login'))
- result = _validate_ticket(ticket)
- if result and result.get('valid') is True:
- logger.info(f"[SSO] 验证通过,登录成功: user_id={result.get('user_id')}, mobile={result.get('mobile')}")
- session['logged_in'] = True
- return redirect(url_for('main.index'))
- else:
- logger.warning(f"[SSO] 验证失败: result={result}")
- flash('统一登录验证失败', 'error')
- return redirect(url_for('auth.login'))
- @auth_bp.route('/logout')
- def logout():
- """登出"""
- session.pop('logged_in', None)
- flash('已退出登录', 'success')
- return redirect(url_for('auth.login'))
|