from flask import Blueprint, jsonify, request from hardware.door_module import set_emergency_control, open_door_control, load_config from utils.logger_config import logger from api.utils import login_required, ping_ip import threading door_bp = Blueprint('door', __name__) @door_bp.route('/api/door/control', methods=['POST']) @login_required def door_control_api(): """门禁控制接口""" try: data = request.get_json() if not data or 'control_way' not in data: return jsonify({ "success": False, "message": "缺少 control_way 参数" }), 400 control_way = data['control_way'] password = data.get('password') # 可选参数 if not isinstance(control_way, int) or control_way not in [0, 1, 2]: return jsonify({ "success": False, "message": "control_way 必须是 0(在线), 1(常开), 2(常闭)" }), 400 def task(): try: set_emergency_control(control_way, password) except Exception as e: logger.error(f"异步门禁控制异常 (Mode: {control_way}): {str(e)}") threading.Thread(target=task).start() return jsonify({ "success": True, "message": "已发送门禁控制指令 - 异步执行", "data": { "control_way": control_way } }) except Exception as e: logger.error(f"门禁控制异常: {str(e)}") return jsonify({ "success": False, "message": f"门禁控制失败: {str(e)}" }), 500 @door_bp.route('/api/door/open', methods=['POST']) @login_required def door_open_api(): """远程开门接口""" try: data = request.get_json() if not data or 'door_id' not in data: return jsonify({ "success": False, "message": "缺少 door_id 参数" }), 400 door_id = data['door_id'] password = data.get('password') # 可选参数 if not isinstance(door_id, int): return jsonify({ "success": False, "message": "door_id 必须是整数" }), 400 def task(): try: open_door_control(door_id, password) except Exception as e: logger.error(f"异步远程开门异常 (ID: {door_id}): {str(e)}") threading.Thread(target=task).start() return jsonify({ "success": True, "message": "已发送远程开门指令 - 异步执行", "data": { "door_id": door_id } }) except Exception as e: logger.error(f"远程开门异常: {str(e)}") return jsonify({ "success": False, "message": f"远程开门失败: {str(e)}" }), 500 from application.self_check_service import check_door_status @door_bp.route('/api/door/self_check', methods=['POST']) @login_required def self_check_api(): """门禁自检""" try: results = check_door_status() # 检查是否有错误返回 if results and "error" in results[0]: return jsonify({ "success": False, "message": results[0]["error"], "data": [] }), 500 return jsonify({ "success": True, "message": "门禁自检完成", "data": results }) except Exception as e: logger.error(f"门禁自检异常: {str(e)}") return jsonify({ "success": False, "message": f"门禁自检失败: {str(e)}" }), 500