| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- import requests
- import yaml
- import os
- from utils.logger_config import logger
- CONFIG_FILE = 'door_config.yaml'
- def load_config():
- """加载门禁配置"""
- if not os.path.exists(CONFIG_FILE):
- logger.error(f"配置文件 {CONFIG_FILE} 不存在")
- return None
-
- try:
- with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
- config = yaml.safe_load(f)
- return config.get('door')
- except Exception as e:
- logger.error(f"读取配置文件失败: {e}")
- return None
- def set_emergency_control(control_way, password=None):
- """
- 设置紧急开关门方式
- :param control_way: 0:在线, 1:常开, 2:常闭
- :param password: 密码,如果未提供则从配置读取
- :return: (success, message)
- """
- config = load_config()
- if not config:
- return False, "无法加载配置"
- ip = config.get('ip')
- port = config.get('port', 14460)
- stored_password = config.get('password')
-
- # 优先使用传入的密码,否则使用配置文件的密码
- # 如果两者都为 None,则默认为空字符串
- final_password = password if password is not None else (stored_password if stored_password is not None else "")
-
- if not ip:
- return False, "配置中缺少IP地址"
-
- # 允许空密码,不再检查 if not final_password
- url = f"http://{ip}:{port}/setEmergencyControl"
-
- params = {
- "pass": final_password,
- "controlWay": control_way
- }
-
- try:
- # 根据用户提供的示例,请求体为 JSON 格式
- response = requests.post(url, json=params, timeout=5)
-
- if response.status_code == 200:
- try:
- resp_data = response.json()
- logger.info(f"门禁控制请求成功: {control_way}, 响应: {resp_data}")
-
- if resp_data.get('result') is True:
- return True, "设置成功"
- else:
- error_msg = resp_data.get('message', '未知错误')
- return False, f"设置失败: {error_msg}"
- except Exception as e:
- logger.error(f"解析响应JSON失败: {e}, 原始内容: {response.text}")
- return False, f"响应格式错误: {response.text}"
- else:
- logger.error(f"门禁控制请求失败: {response.status_code}, {response.text}")
- return False, f"请求失败, 状态码: {response.status_code}"
-
- except Exception as e:
- logger.error(f"门禁控制异常: {e}")
- return False, f"请求异常: {str(e)}"
- def open_door_control(door_id, password=None):
- """
- 远程开门控制
- :param door_id: 门编号
- :param password: 密码,如果未提供则从配置读取
- :return: (success, message)
- """
- config = load_config()
- if not config:
- return False, "无法加载配置"
- ip = config.get('ip')
- port = config.get('port', 14460)
- stored_password = config.get('password')
-
- # 优先使用传入的密码,否则使用配置文件的密码
- # 如果两者都为 None,则默认为空字符串
- final_password = password if password is not None else (stored_password if stored_password is not None else "")
-
- if not ip:
- return False, "配置中缺少IP地址"
-
- # 允许空密码,不再检查 if not final_password
- url = f"http://{ip}:{port}/openDoorControl"
-
- params = {
- "pass": final_password,
- "doorId": door_id
- }
-
- try:
- # 请求体为 JSON 格式
- response = requests.post(url, json=params, timeout=5)
-
- if response.status_code == 200:
- try:
- resp_data = response.json()
- logger.info(f"远程开门请求成功: {door_id}, 响应: {resp_data}")
-
- if resp_data.get('result') is True:
- return True, "远程开门成功"
- else:
- error_msg = resp_data.get('message', '未知错误')
- return False, f"远程开门失败: {error_msg}"
- except Exception as e:
- logger.error(f"解析响应JSON失败: {e}, 原始内容: {response.text}")
- return False, f"响应格式错误: {response.text}"
- else:
- logger.error(f"远程开门请求失败: {response.status_code}, {response.text}")
- return False, f"请求失败, 状态码: {response.status_code}"
-
- except Exception as e:
- logger.error(f"远程开门异常: {e}")
- return False, f"请求异常: {str(e)}"
|