door.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. from flask import Blueprint, jsonify, request
  2. from hardware.door_module import set_emergency_control, open_door_control
  3. from utils.logger_config import logger
  4. from api.utils import login_required
  5. import threading
  6. door_bp = Blueprint('door', __name__)
  7. @door_bp.route('/api/door/control', methods=['POST'])
  8. @login_required
  9. def door_control_api():
  10. """门禁控制接口"""
  11. try:
  12. data = request.get_json()
  13. if not data or 'control_way' not in data:
  14. return jsonify({
  15. "success": False,
  16. "message": "缺少 control_way 参数"
  17. }), 400
  18. control_way = data['control_way']
  19. password = data.get('password') # 可选参数
  20. if not isinstance(control_way, int) or control_way not in [0, 1, 2]:
  21. return jsonify({
  22. "success": False,
  23. "message": "control_way 必须是 0(在线), 1(常开), 2(常闭)"
  24. }), 400
  25. def task():
  26. try:
  27. set_emergency_control(control_way, password)
  28. except Exception as e:
  29. logger.error(f"异步门禁控制异常 (Mode: {control_way}): {str(e)}")
  30. threading.Thread(target=task).start()
  31. return jsonify({
  32. "success": True,
  33. "message": "已发送门禁控制指令 - 异步执行",
  34. "data": {
  35. "control_way": control_way
  36. }
  37. })
  38. except Exception as e:
  39. logger.error(f"门禁控制异常: {str(e)}")
  40. return jsonify({
  41. "success": False,
  42. "message": f"门禁控制失败: {str(e)}"
  43. }), 500
  44. @door_bp.route('/api/door/open', methods=['POST'])
  45. @login_required
  46. def door_open_api():
  47. """远程开门接口"""
  48. try:
  49. data = request.get_json()
  50. if not data or 'door_id' not in data:
  51. return jsonify({
  52. "success": False,
  53. "message": "缺少 door_id 参数"
  54. }), 400
  55. door_id = data['door_id']
  56. password = data.get('password') # 可选参数
  57. if not isinstance(door_id, int):
  58. return jsonify({
  59. "success": False,
  60. "message": "door_id 必须是整数"
  61. }), 400
  62. def task():
  63. try:
  64. open_door_control(door_id, password)
  65. except Exception as e:
  66. logger.error(f"异步远程开门异常 (ID: {door_id}): {str(e)}")
  67. threading.Thread(target=task).start()
  68. return jsonify({
  69. "success": True,
  70. "message": "已发送远程开门指令 - 异步执行",
  71. "data": {
  72. "door_id": door_id
  73. }
  74. })
  75. except Exception as e:
  76. logger.error(f"远程开门异常: {str(e)}")
  77. return jsonify({
  78. "success": False,
  79. "message": f"远程开门失败: {str(e)}"
  80. }), 500