wled_controller_module.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. import yaml
  2. import os
  3. import requests
  4. import time
  5. from typing import Optional, Dict, Any
  6. from .wled_module import WledState, WledSegment
  7. from utils.logger_config import logger
  8. class WledController:
  9. """
  10. WLED控制器类
  11. 负责从配置文件读取设置,生成WledState对象,并提供设备控制功能
  12. """
  13. def __init__(self, config_path: str = "led_config.yaml"):
  14. """
  15. 初始化WLED控制器
  16. Args:
  17. config_path: 配置文件路径,默认为led_config.yaml
  18. Raises:
  19. FileNotFoundError: 配置文件不存在
  20. KeyError: 必需的配置项缺失
  21. ValueError: 配置值无效
  22. """
  23. self.config_path = config_path
  24. self.config = self._load_config()
  25. # 验证必需的配置项
  26. if 'wled_ip' not in self.config:
  27. error_msg = "配置文件中缺少必需的 'wled_ip' 配置项"
  28. logger.error(error_msg)
  29. raise KeyError(error_msg)
  30. if 'led_group_count' not in self.config:
  31. error_msg = "配置文件中缺少必需的 'led_group_count' 配置项"
  32. logger.error(error_msg)
  33. raise KeyError(error_msg)
  34. if 'segments' not in self.config:
  35. error_msg = "配置文件中缺少必需的 'segments' 配置项"
  36. logger.error(error_msg)
  37. raise KeyError(error_msg)
  38. # 验证配置值
  39. self.wled_ip = self.config['wled_ip']
  40. if not isinstance(self.wled_ip, str) or not self.wled_ip.strip():
  41. error_msg = f"无效的 wled_ip 配置值: {self.wled_ip}"
  42. logger.error(error_msg)
  43. raise ValueError(error_msg)
  44. self.led_group_count = self.config['led_group_count']
  45. if not isinstance(self.led_group_count, int) or self.led_group_count <= 0:
  46. error_msg = f"无效的 led_group_count 配置值: {self.led_group_count},必须为正整数"
  47. logger.error(error_msg)
  48. raise ValueError(error_msg)
  49. self.segments_config = self.config['segments']
  50. if not isinstance(self.segments_config, list):
  51. error_msg = f"无效的 segments 配置值: {self.segments_config},必须为列表"
  52. logger.error(error_msg)
  53. raise ValueError(error_msg)
  54. if not self.segments_config:
  55. error_msg = "segments 配置不能为空"
  56. logger.error(error_msg)
  57. raise ValueError(error_msg)
  58. # 按配置文件分段
  59. self.wled_state = self._create_wled_state()
  60. # 创建合并所有分段的状态对象,用于控制整条灯带
  61. self.wled_state_all = self.wled_state.merge_segments()
  62. logger.info(f"WLED控制器初始化完成,IP: {self.wled_ip}")
  63. def _load_config(self) -> Dict[str, Any]:
  64. """
  65. 从YAML配置文件加载配置
  66. Returns:
  67. dict: 配置字典
  68. Raises:
  69. FileNotFoundError: 配置文件不存在
  70. yaml.YAMLError: YAML解析错误
  71. Exception: 其他加载错误
  72. """
  73. if not os.path.exists(self.config_path):
  74. error_msg = f"配置文件不存在: {self.config_path}"
  75. logger.error(error_msg)
  76. raise FileNotFoundError(error_msg)
  77. try:
  78. with open(self.config_path, 'r', encoding='utf-8') as file:
  79. config = yaml.safe_load(file)
  80. if config is None:
  81. error_msg = f"配置文件为空或格式错误: {self.config_path}"
  82. logger.error(error_msg)
  83. raise ValueError(error_msg)
  84. logger.info(f"成功加载配置文件: {self.config_path}")
  85. return config
  86. except yaml.YAMLError as e:
  87. error_msg = f"YAML解析错误: {e}"
  88. logger.error(error_msg)
  89. raise
  90. except Exception as e:
  91. error_msg = f"加载配置文件失败: {e}"
  92. logger.error(error_msg)
  93. raise
  94. def _create_wled_state(self) -> WledState:
  95. """
  96. 根据配置文件创建WledState对象
  97. Returns:
  98. WledState: 配置好的WLED状态对象
  99. Raises:
  100. KeyError: 分段配置缺少必需字段
  101. ValueError: 分段配置值无效
  102. """
  103. # 创建基础状态
  104. wled_state = WledState(
  105. on=True,
  106. brightness=255,
  107. transition=10
  108. )
  109. # 根据配置创建分段
  110. for i, segment_config in enumerate(self.segments_config):
  111. # 验证分段配置的必需字段
  112. required_fields = ['id', 'start', 'stop', 'name']
  113. for field in required_fields:
  114. if field not in segment_config:
  115. error_msg = f"分段配置 {i} 缺少必需的字段 '{field}'"
  116. logger.error(error_msg)
  117. raise KeyError(error_msg)
  118. # 验证分段配置值
  119. segment_id = segment_config['id']
  120. if not isinstance(segment_id, int):
  121. error_msg = f"分段 {i} 的 id 必须是整数,当前值: {segment_id}"
  122. logger.error(error_msg)
  123. raise ValueError(error_msg)
  124. start = segment_config['start']
  125. if not isinstance(start, int) or start < 0:
  126. error_msg = f"分段 {i} 的 start 必须是非负整数,当前值: {start}"
  127. logger.error(error_msg)
  128. raise ValueError(error_msg)
  129. stop = segment_config['stop']
  130. if not isinstance(stop, int) or stop <= start:
  131. error_msg = f"分段 {i} 的 stop 必须是大于 start 的整数,当前值: stop={stop}, start={start}"
  132. logger.error(error_msg)
  133. raise ValueError(error_msg)
  134. name = segment_config['name']
  135. if not isinstance(name, str) or not name.strip():
  136. error_msg = f"分段 {i} 的 name 必须是非空字符串,当前值: {name}"
  137. logger.error(error_msg)
  138. raise ValueError(error_msg)
  139. segment = WledSegment(
  140. start=start,
  141. stop=stop,
  142. segment_id=segment_id,
  143. group=self.led_group_count,
  144. on=True,
  145. colors=[[0, 0, 255], [0, 0, 0], [0, 0, 0]], # 默认蓝色
  146. effect=6, # 默认彩虹特效
  147. speed=230,
  148. intensity=255,
  149. palette=0,
  150. reverse=False,
  151. selected=True
  152. )
  153. wled_state.add_segment(segment)
  154. logger.info(f"创建分段: {name} (ID: {segment.id}, LED: {segment.start}-{segment.stop-1})")
  155. logger.info(f"WledState创建完成,共{len(self.segments_config)}个分段")
  156. return wled_state
  157. def get_wled_state(self) -> WledState:
  158. """
  159. 获取当前WLED状态对象
  160. Returns:
  161. WledState: 当前状态对象
  162. """
  163. return self.wled_state
  164. def get_segment_by_id(self, id: int) -> WledSegment:
  165. """
  166. 根据ID获取分段
  167. Args:
  168. id: 分段ID
  169. Returns:
  170. WledSegment: 分段对象
  171. Raises:
  172. ValueError: 分段ID无效或分段不存在
  173. """
  174. if not isinstance(id, int) or id < 0:
  175. error_msg = f"分段ID必须是非负整数,当前值: {id}"
  176. logger.error(error_msg)
  177. raise ValueError(error_msg)
  178. return self.wled_state.get_segment(id)
  179. def set_segment_effect(self, segment_id: int, effect_id: int,
  180. speed: int = None, intensity: int = None):
  181. """
  182. 设置指定分段的特效
  183. Args:
  184. segment_id: 分段ID
  185. effect_id: 特效ID
  186. speed: 特效速度 (0-255)
  187. intensity: 特效强度 (0-255)
  188. Raises:
  189. ValueError: 分段ID无效或分段不存在
  190. """
  191. if not isinstance(segment_id, int) or segment_id < 0:
  192. error_msg = f"分段ID必须是非负整数,当前值: {segment_id}"
  193. logger.error(error_msg)
  194. raise ValueError(error_msg)
  195. segment = self.wled_state.get_segment(segment_id)
  196. if not segment:
  197. error_msg = f"未找到分段ID: {segment_id}"
  198. logger.error(error_msg)
  199. raise ValueError(error_msg)
  200. segment.set_effect(effect_id, speed, intensity)
  201. logger.info(f"分段{segment_id}特效已设置为: {effect_id}")
  202. def set_global_brightness(self, brightness: int):
  203. """
  204. 设置全局亮度
  205. Args:
  206. brightness: 亮度值 0-255
  207. Raises:
  208. ValueError: 亮度值无效
  209. """
  210. if not isinstance(brightness, int) or brightness < 0 or brightness > 255:
  211. error_msg = f"亮度值必须是0-255之间的整数,当前值: {brightness}"
  212. logger.error(error_msg)
  213. raise ValueError(error_msg)
  214. self.wled_state.set_global_brightness(brightness)
  215. logger.info(f"全局亮度已设置为: {brightness}")
  216. def toggle_power(self):
  217. """切换设备总开关状态"""
  218. self.wled_state.toggle_power()
  219. status = "开启" if self.wled_state.on else "关闭"
  220. logger.info(f"设备状态已切换为: {status}")
  221. def send_state(self, state: WledState = None, timeout: float = 5.0,
  222. retry_count: int = 3) -> bool:
  223. """
  224. 发送状态到WLED设备
  225. Args:
  226. state: 要发送的状态对象,默认为当前状态
  227. timeout: 请求超时时间(秒)
  228. retry_count: 重试次数
  229. Returns:
  230. bool: 发送是否成功
  231. """
  232. if state is None:
  233. state = self.wled_state
  234. payload = state.to_dict()
  235. url = f"http://{self.wled_ip}/json/state"
  236. headers = {"Content-Type": "application/json"}
  237. for attempt in range(retry_count + 1):
  238. try:
  239. logger.info(f"发送状态到设备 {self.wled_ip} (尝试 {attempt + 1}/{retry_count + 1})")
  240. logger.info(f"发送状态: {payload}")
  241. response = requests.post(
  242. url,
  243. headers=headers,
  244. json=payload,
  245. timeout=timeout,
  246. verify=False
  247. )
  248. if response.status_code == 200:
  249. logger.info(f"状态发送成功,状态码: {response.status_code}")
  250. return True
  251. else:
  252. logger.warning(f"状态发送失败,状态码: {response.status_code}")
  253. except requests.exceptions.RequestException as e:
  254. logger.error(f"发送请求失败 (尝试 {attempt + 1}): {e}")
  255. if attempt < retry_count:
  256. wait_time = 2 ** attempt # 指数退避
  257. logger.info(f"等待 {wait_time} 秒后重试...")
  258. time.sleep(wait_time)
  259. logger.error(f"所有重试均失败,无法发送状态到设备 {self.wled_ip}")
  260. return False
  261. def __str__(self) -> str:
  262. """返回控制器的字符串表示"""
  263. return f"WledController(IP: {self.wled_ip}, 分段数: {len(self.segments_config)})"