| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- from flask import Blueprint, jsonify, request
- from hardware.door_module import set_emergency_control, open_door_control, load_config
- from utils.logger_config import logger
- from api.utils import login_required, ping_ip
- import threading
- door_bp = Blueprint('door', __name__)
- @door_bp.route('/api/door/control', methods=['POST'])
- @login_required
- def door_control_api():
- """门禁控制接口"""
- try:
- data = request.get_json()
- if not data or 'control_way' not in data:
- return jsonify({
- "success": False,
- "message": "缺少 control_way 参数"
- }), 400
-
- control_way = data['control_way']
- password = data.get('password') # 可选参数
-
- if not isinstance(control_way, int) or control_way not in [0, 1, 2]:
- return jsonify({
- "success": False,
- "message": "control_way 必须是 0(在线), 1(常开), 2(常闭)"
- }), 400
- def task():
- try:
- set_emergency_control(control_way, password)
- except Exception as e:
- logger.error(f"异步门禁控制异常 (Mode: {control_way}): {str(e)}")
-
- threading.Thread(target=task).start()
-
- return jsonify({
- "success": True,
- "message": "已发送门禁控制指令 - 异步执行",
- "data": {
- "control_way": control_way
- }
- })
- except Exception as e:
- logger.error(f"门禁控制异常: {str(e)}")
- return jsonify({
- "success": False,
- "message": f"门禁控制失败: {str(e)}"
- }), 500
- @door_bp.route('/api/door/open', methods=['POST'])
- @login_required
- def door_open_api():
- """远程开门接口"""
- try:
- data = request.get_json()
- if not data or 'door_id' not in data:
- return jsonify({
- "success": False,
- "message": "缺少 door_id 参数"
- }), 400
-
- door_id = data['door_id']
- password = data.get('password') # 可选参数
-
- if not isinstance(door_id, int):
- return jsonify({
- "success": False,
- "message": "door_id 必须是整数"
- }), 400
- def task():
- try:
- open_door_control(door_id, password)
- except Exception as e:
- logger.error(f"异步远程开门异常 (ID: {door_id}): {str(e)}")
-
- threading.Thread(target=task).start()
-
- return jsonify({
- "success": True,
- "message": "已发送远程开门指令 - 异步执行",
- "data": {
- "door_id": door_id
- }
- })
- except Exception as e:
- logger.error(f"远程开门异常: {str(e)}")
- return jsonify({
- "success": False,
- "message": f"远程开门失败: {str(e)}"
- }), 500
- from application.self_check_service import check_door_status
- @door_bp.route('/api/door/self_check', methods=['POST'])
- @login_required
- def self_check_api():
- """门禁自检"""
- try:
- results = check_door_status()
-
- # 检查是否有错误返回
- if results and "error" in results[0]:
- return jsonify({
- "success": False,
- "message": results[0]["error"],
- "data": []
- }), 500
- return jsonify({
- "success": True,
- "message": "门禁自检完成",
- "data": results
- })
- except Exception as e:
- logger.error(f"门禁自检异常: {str(e)}")
- return jsonify({
- "success": False,
- "message": f"门禁自检失败: {str(e)}"
- }), 500
|