wled_thread.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. # 这个模块是负责控制wled灯带的线程
  2. # 只有一个对外开放的函数,这个函数方法描述如下
  3. # 1.接收一个展品ID,0开始
  4. # 2.根据展品ID,只有哪个展品ID的灯效是呼吸灯效,其他灯效都设置为静止
  5. # 3.灯的颜色都是白色
  6. # 4.当触发10s后,所有展品都随机选择COLOR_RANDOM_WAVE = 5、COLOR_RANDOM_WINK = 7、COLOR_RANDOM_BREATH = 8三个灯效之一
  7. import threading
  8. import time
  9. import random
  10. from typing import Optional
  11. from hardware.wled_controller_module import WledController
  12. from hardware.wled_enum import LEDEffectType, RGBColor
  13. from utils.logger_config import logger
  14. class WledThreadSingleton:
  15. """WLED线程单例类,线程一直运行,通过修改内部变量控制播放"""
  16. _instance = None
  17. _lock = threading.Lock()
  18. def __new__(cls):
  19. if cls._instance is None:
  20. with cls._lock:
  21. if cls._instance is None:
  22. cls._instance = super(WledThreadSingleton, cls).__new__(cls)
  23. return cls._instance
  24. def __init__(self):
  25. # 避免重复初始化
  26. if hasattr(self, '_initialized'):
  27. return
  28. self._initialized = True
  29. self.controller = None
  30. self.thread = None
  31. self.is_running = False
  32. # 线程控制变量(需要加锁)
  33. self._data_lock = threading.Lock()
  34. self._current_exhibit_id = 0
  35. self._is_new_task = False
  36. self._should_stop = False
  37. # 启动工作线程
  38. self._start_worker_thread()
  39. def _start_worker_thread(self):
  40. """启动工作线程"""
  41. if self.thread is None or not self.thread.is_alive():
  42. self.is_running = True
  43. self.thread = threading.Thread(target=self._worker_loop, daemon=True)
  44. self.thread.start()
  45. logger.info("WLED工作线程已启动")
  46. def _initialize_controller(self):
  47. """初始化WLED控制器"""
  48. try:
  49. if self.controller is None:
  50. self.controller = WledController()
  51. logger.info("WLED控制器初始化成功")
  52. except Exception as e:
  53. logger.error(f"WLED控制器初始化失败: {e}")
  54. raise
  55. def _worker_loop(self):
  56. """工作线程主循环"""
  57. logger.info("WLED工作线程开始运行")
  58. try:
  59. # 初始化控制器
  60. self._initialize_controller()
  61. # 状态变量
  62. current_phase = "idle" # idle, breathing, random
  63. phase_start_time = 0
  64. breathing_duration = 10 # 呼吸灯效持续时间(秒)
  65. while self.is_running and not self._should_stop:
  66. try:
  67. # 检查是否有新任务
  68. with self._data_lock:
  69. is_new_task = self._is_new_task
  70. exhibit_id = self._current_exhibit_id
  71. if is_new_task:
  72. self._is_new_task = False
  73. # 处理新任务
  74. if is_new_task:
  75. logger.info(f"收到新任务,展品ID: {exhibit_id}")
  76. current_phase = "breathing"
  77. phase_start_time = time.time()
  78. self._set_breathing_effect(exhibit_id)
  79. # 检查阶段切换
  80. if current_phase == "breathing":
  81. elapsed_time = time.time() - phase_start_time
  82. if elapsed_time >= breathing_duration:
  83. logger.info("切换到随机灯效阶段")
  84. current_phase = "random"
  85. self._set_random_effects()
  86. # 短暂休眠,避免CPU占用过高
  87. time.sleep(0.1)
  88. except Exception as e:
  89. logger.error(f"工作线程循环异常: {e}")
  90. time.sleep(1)
  91. except Exception as e:
  92. logger.error(f"工作线程异常: {e}")
  93. finally:
  94. logger.info("WLED工作线程结束")
  95. def set_exhibit_task(self, exhibit_id: int) -> bool:
  96. """
  97. 设置新的展品任务
  98. Args:
  99. exhibit_id: 展品ID,从0开始
  100. Returns:
  101. bool: 设置是否成功
  102. """
  103. try:
  104. # 验证展品ID
  105. if not isinstance(exhibit_id, int) or exhibit_id < 0:
  106. logger.error(f"无效的展品ID: {exhibit_id}")
  107. return False
  108. # 检查展品ID是否在有效范围内
  109. if self.controller is None:
  110. self._initialize_controller()
  111. segments_count = self.controller.wled_state.get_segments_count()
  112. if exhibit_id >= segments_count:
  113. logger.error(f"展品ID {exhibit_id} 超出范围,最大ID为 {segments_count - 1}")
  114. return False
  115. # 设置新任务
  116. with self._data_lock:
  117. self._current_exhibit_id = exhibit_id
  118. self._is_new_task = True
  119. logger.info(f"设置新任务,展品ID: {exhibit_id}")
  120. return True
  121. except Exception as e:
  122. logger.error(f"设置展品任务失败: {e}")
  123. return False
  124. def stop_effect(self):
  125. """停止当前灯效"""
  126. with self._data_lock:
  127. self._should_stop = True
  128. logger.info("停止灯效控制")
  129. def _set_breathing_effect(self, exhibit_id: int):
  130. """
  131. 设置呼吸灯效
  132. Args:
  133. exhibit_id: 展品ID
  134. """
  135. try:
  136. logger.info(f"设置展品 {exhibit_id} 为呼吸灯效")
  137. # 获取所有分段
  138. segments = self.controller.wled_state.seg
  139. for i, segment in enumerate(segments):
  140. if i == exhibit_id:
  141. # 目标展品:设置为呼吸灯效,白色
  142. segment.set_color(RGBColor.WHITE.rgb_tuple)
  143. segment.set_effect(LEDEffectType.BREATH.value, speed=200, intensity=255)
  144. segment.on = True
  145. logger.info(f"分段 {i} ({segment}) 设置为呼吸灯效")
  146. else:
  147. # 其他展品:设置为静止,白色
  148. segment.set_color(RGBColor.WHITE.rgb_tuple)
  149. segment.set_effect(LEDEffectType.SOLID.value, speed=0, intensity=0)
  150. segment.on = True
  151. logger.info(f"分段 {i} ({segment}) 设置为静止")
  152. # 发送状态到设备
  153. success = self.controller.send_state()
  154. if success:
  155. logger.info("呼吸灯效设置成功")
  156. else:
  157. logger.error("呼吸灯效设置失败")
  158. except Exception as e:
  159. logger.error(f"设置呼吸灯效失败: {e}")
  160. def _set_random_effects(self):
  161. """
  162. 设置随机灯效
  163. """
  164. try:
  165. logger.info(f"设置所有展品为随机灯效")
  166. # 随机选择灯效
  167. random_effects = [
  168. LEDEffectType.COLOR_RANDOM_BREATH.value
  169. ]
  170. # 获取所有分段
  171. segments = self.controller.wled_state.seg
  172. for i, segment in enumerate(segments):
  173. # 为每个展品随机选择一种灯效
  174. selected_effect = random.choice(random_effects)
  175. segment.set_color(RGBColor.WHITE.rgb_tuple)
  176. segment.set_effect(selected_effect, speed=200, intensity=255)
  177. segment.on = True
  178. logger.info(f"分段 {i} ({segment}) 设置为随机灯效 {selected_effect}")
  179. # 发送状态到设备
  180. self.controller.get_wled_state().set_transition_time(10000)
  181. success = self.controller.send_state()
  182. if success:
  183. logger.info("所有展品的随机灯效设置成功")
  184. else:
  185. logger.error("随机灯效设置失败")
  186. except Exception as e:
  187. logger.error(f"设置随机灯效失败: {e}")
  188. # 全局单例实例
  189. _wled_thread = WledThreadSingleton()
  190. def start_exhibit_led_effect(exhibit_id: int) -> bool:
  191. """
  192. 启动展品LED灯效控制
  193. 这是模块的主要对外接口函数,功能如下:
  194. 1. 接收一个展品ID,从0开始
  195. 2. 根据展品ID,只有该展品ID的灯效是呼吸灯效,其他灯效都设置为静止
  196. 3. 灯的颜色都是白色
  197. 4. 当触发10秒后,所有展品都随机选择COLOR_RANDOM_WAVE = 5、COLOR_RANDOM_WINK = 7、COLOR_RANDOM_BREATH = 8三个灯效之一
  198. Args:
  199. exhibit_id: 展品ID,从0开始
  200. Returns:
  201. bool: 启动是否成功
  202. """
  203. return _wled_thread.set_exhibit_task(exhibit_id)
  204. def stop_led_effect():
  205. """
  206. 停止当前LED灯效
  207. Returns:
  208. bool: 停止是否成功
  209. """
  210. _wled_thread.stop_effect()
  211. return True
  212. def is_effect_running() -> bool:
  213. """
  214. 检查是否有灯效正在运行
  215. Returns:
  216. bool: 是否有灯效正在运行
  217. """
  218. return _wled_thread.is_running