door.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. from flask import Blueprint, jsonify, request
  2. from hardware.door_module import set_emergency_control, open_door_control, load_config
  3. from utils.logger_config import logger
  4. from api.utils import login_required, ping_ip
  5. import threading
  6. door_bp = Blueprint('door', __name__)
  7. @door_bp.route('/api/door/control', methods=['POST'])
  8. @login_required
  9. def door_control_api():
  10. """门禁控制接口"""
  11. try:
  12. data = request.get_json()
  13. if not data or 'control_way' not in data:
  14. return jsonify({
  15. "success": False,
  16. "message": "缺少 control_way 参数"
  17. }), 400
  18. control_way = data['control_way']
  19. password = data.get('password') # 可选参数
  20. if not isinstance(control_way, int) or control_way not in [0, 1, 2]:
  21. return jsonify({
  22. "success": False,
  23. "message": "control_way 必须是 0(在线), 1(常开), 2(常闭)"
  24. }), 400
  25. def task():
  26. try:
  27. set_emergency_control(control_way, password)
  28. except Exception as e:
  29. logger.error(f"异步门禁控制异常 (Mode: {control_way}): {str(e)}")
  30. threading.Thread(target=task).start()
  31. return jsonify({
  32. "success": True,
  33. "message": "已发送门禁控制指令 - 异步执行",
  34. "data": {
  35. "control_way": control_way
  36. }
  37. })
  38. except Exception as e:
  39. logger.error(f"门禁控制异常: {str(e)}")
  40. return jsonify({
  41. "success": False,
  42. "message": f"门禁控制失败: {str(e)}"
  43. }), 500
  44. @door_bp.route('/api/door/open', methods=['POST'])
  45. @login_required
  46. def door_open_api():
  47. """远程开门接口"""
  48. try:
  49. data = request.get_json()
  50. if not data or 'door_id' not in data:
  51. return jsonify({
  52. "success": False,
  53. "message": "缺少 door_id 参数"
  54. }), 400
  55. door_id = data['door_id']
  56. password = data.get('password') # 可选参数
  57. if not isinstance(door_id, int):
  58. return jsonify({
  59. "success": False,
  60. "message": "door_id 必须是整数"
  61. }), 400
  62. def task():
  63. try:
  64. open_door_control(door_id, password)
  65. except Exception as e:
  66. logger.error(f"异步远程开门异常 (ID: {door_id}): {str(e)}")
  67. threading.Thread(target=task).start()
  68. return jsonify({
  69. "success": True,
  70. "message": "已发送远程开门指令 - 异步执行",
  71. "data": {
  72. "door_id": door_id
  73. }
  74. })
  75. except Exception as e:
  76. logger.error(f"远程开门异常: {str(e)}")
  77. return jsonify({
  78. "success": False,
  79. "message": f"远程开门失败: {str(e)}"
  80. }), 500
  81. from application.self_check_service import check_door_status
  82. @door_bp.route('/api/door/self_check', methods=['POST'])
  83. @login_required
  84. def self_check_api():
  85. """门禁自检"""
  86. try:
  87. results = check_door_status()
  88. # 检查是否有错误返回
  89. if results and "error" in results[0]:
  90. return jsonify({
  91. "success": False,
  92. "message": results[0]["error"],
  93. "data": []
  94. }), 500
  95. return jsonify({
  96. "success": True,
  97. "message": "门禁自检完成",
  98. "data": results
  99. })
  100. except Exception as e:
  101. logger.error(f"门禁自检异常: {str(e)}")
  102. return jsonify({
  103. "success": False,
  104. "message": f"门禁自检失败: {str(e)}"
  105. }), 500