import hmac import hashlib import time from typing import Optional import requests from flask import Blueprint, render_template, request, session, redirect, url_for, flash from utils.logger_config import logger auth_bp = Blueprint('auth', __name__) # 登录账号配置 ADMIN_USERNAME = 'admin' ADMIN_PASSWORD = 'HNYZ0821' # 统一登录平台配置 SSO_APP_ID = 'app_aa755b61de0b3da8' SSO_APP_SECRET = 'ND5bv3WjAc8DwueDSoRXAC04XUyV1X1D' SSO_LOGIN_URL = 'https://api.hnyunzhu.com/login' SSO_VALIDATE_URL = 'https://api.hnyunzhu.com/api/v1/simple/validate' def _generate_signature(secret: str, params: dict) -> str: """生成 HMAC-SHA256 签名""" data = {k: v for k, v in params.items() if k != "sign" and v is not None} sorted_keys = sorted(data.keys()) query_string = "&".join([f"{k}={data[k]}" for k in sorted_keys]) return hmac.new( secret.encode('utf-8'), query_string.encode('utf-8'), hashlib.sha256 ).hexdigest() def _validate_ticket(ticket: str) -> Optional[dict]: """调用 UAP 验证票据""" payload = { "app_id": SSO_APP_ID, "ticket": ticket, "timestamp": int(time.time()) } payload["sign"] = _generate_signature(SSO_APP_SECRET, payload) try: resp = requests.post(SSO_VALIDATE_URL, json=payload, timeout=10) result = resp.json() logger.info(f"[SSO] validate 请求: url={SSO_VALIDATE_URL}, status={resp.status_code}, response={result}") return result except Exception as e: logger.exception(f"[SSO] validate 请求异常: ticket={ticket[:20]}..., error={e}") return None @auth_bp.route('/login', methods=['GET', 'POST']) def login(): """登录页面""" if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') if username == ADMIN_USERNAME and password == ADMIN_PASSWORD: session['logged_in'] = True # flash('登录成功', 'success') # 登录成功不需要提示,直接进 return redirect(url_for('main.index')) else: flash('账号或密码错误', 'error') return render_template('login.html') return render_template('login.html') @auth_bp.route('/login/sso') def login_sso(): """跳转到统一登录平台""" return redirect(f"{SSO_LOGIN_URL}?app_id={SSO_APP_ID}") @auth_bp.route('/callback') def sso_callback(): """统一登录回调:验证 ticket,valid 通过则登录""" ticket = request.args.get('ticket') logger.info(f"[SSO] callback 收到请求: args={dict(request.args)}") if not ticket: logger.warning("[SSO] callback 缺少 ticket 参数") flash('无效的回调', 'error') return redirect(url_for('auth.login')) result = _validate_ticket(ticket) if result and result.get('valid') is True: logger.info(f"[SSO] 验证通过,登录成功: user_id={result.get('user_id')}, mobile={result.get('mobile')}") session['logged_in'] = True return redirect(url_for('main.index')) else: logger.warning(f"[SSO] 验证失败: result={result}") flash('统一登录验证失败', 'error') return redirect(url_for('auth.login')) @auth_bp.route('/logout') def logout(): """登出""" session.pop('logged_in', None) flash('已退出登录', 'success') return redirect(url_for('auth.login'))