# 这个模块是负责控制wled灯带的线程 # 只有一个对外开放的函数,这个函数方法描述如下 # 1.接收一个展品ID,0开始 # 2.根据展品ID,只有哪个展品ID的灯效是呼吸灯效,其他灯效都设置为静止 # 3.灯的颜色都是白色 # 4.当触发10s后,所有展品都随机选择COLOR_RANDOM_WAVE = 5、COLOR_RANDOM_WINK = 7、COLOR_RANDOM_BREATH = 8三个灯效之一 import threading import time import random from typing import Optional from hardware.wled_controller_module import WledController from hardware.wled_enum import LEDEffectType, RGBColor from utils.logger_config import logger class WledThreadSingleton: """WLED线程单例类,线程一直运行,通过修改内部变量控制播放""" _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super(WledThreadSingleton, cls).__new__(cls) return cls._instance def __init__(self): # 避免重复初始化 if hasattr(self, '_initialized'): return self._initialized = True self.controller = None self.thread = None self.is_running = False # 线程控制变量(需要加锁) self._data_lock = threading.Lock() self._current_exhibit_id = 0 self._is_new_task = False self._should_stop = False # 启动工作线程 self._start_worker_thread() def _start_worker_thread(self): """启动工作线程""" if self.thread is None or not self.thread.is_alive(): self.is_running = True self.thread = threading.Thread(target=self._worker_loop, daemon=True) self.thread.start() logger.info("WLED工作线程已启动") def _initialize_controller(self): """初始化WLED控制器""" try: if self.controller is None: self.controller = WledController() logger.info("WLED控制器初始化成功") except Exception as e: logger.error(f"WLED控制器初始化失败: {e}") raise def _worker_loop(self): """工作线程主循环""" logger.info("WLED工作线程开始运行") try: # 初始化控制器 self._initialize_controller() # 状态变量 current_phase = "idle" # idle, breathing, random phase_start_time = 0 breathing_duration = 10 # 呼吸灯效持续时间(秒) while self.is_running and not self._should_stop: try: # 检查是否有新任务 with self._data_lock: is_new_task = self._is_new_task exhibit_id = self._current_exhibit_id if is_new_task: self._is_new_task = False # 处理新任务 if is_new_task: logger.info(f"收到新任务,展品ID: {exhibit_id}") current_phase = "breathing" phase_start_time = time.time() self._set_breathing_effect(exhibit_id) # 检查阶段切换 if current_phase == "breathing": elapsed_time = time.time() - phase_start_time if elapsed_time >= breathing_duration: logger.info("切换到随机灯效阶段") current_phase = "random" self._set_random_effects() # 短暂休眠,避免CPU占用过高 time.sleep(0.1) except Exception as e: logger.error(f"工作线程循环异常: {e}") time.sleep(1) except Exception as e: logger.error(f"工作线程异常: {e}") finally: logger.info("WLED工作线程结束") def set_exhibit_task(self, exhibit_id: int) -> bool: """ 设置新的展品任务 Args: exhibit_id: 展品ID,从0开始 Returns: bool: 设置是否成功 """ try: # 验证展品ID if not isinstance(exhibit_id, int) or exhibit_id < 0: logger.error(f"无效的展品ID: {exhibit_id}") return False # 检查展品ID是否在有效范围内 if self.controller is None: self._initialize_controller() segments_count = self.controller.wled_state.get_segments_count() if exhibit_id >= segments_count: logger.error(f"展品ID {exhibit_id} 超出范围,最大ID为 {segments_count - 1}") return False # 设置新任务 with self._data_lock: self._current_exhibit_id = exhibit_id self._is_new_task = True logger.info(f"设置新任务,展品ID: {exhibit_id}") return True except Exception as e: logger.error(f"设置展品任务失败: {e}") return False def stop_effect(self): """停止当前灯效""" with self._data_lock: self._should_stop = True logger.info("停止灯效控制") def _set_breathing_effect(self, exhibit_id: int): """ 设置呼吸灯效 Args: exhibit_id: 展品ID """ try: logger.info(f"设置展品 {exhibit_id} 为呼吸灯效") # 获取所有分段 segments = self.controller.wled_state.seg for i, segment in enumerate(segments): if i == exhibit_id: # 目标展品:设置为呼吸灯效,白色 segment.set_color(RGBColor.WHITE.rgb_tuple) segment.set_effect(LEDEffectType.BREATH.value, speed=200, intensity=255) segment.on = True logger.info(f"分段 {i} ({segment}) 设置为呼吸灯效") else: # 其他展品:设置为静止,白色 segment.set_color(RGBColor.WHITE.rgb_tuple) segment.set_effect(LEDEffectType.SOLID.value, speed=0, intensity=0) segment.on = True logger.info(f"分段 {i} ({segment}) 设置为静止") # 发送状态到设备 success = self.controller.send_state() if success: logger.info("呼吸灯效设置成功") else: logger.error("呼吸灯效设置失败") except Exception as e: logger.error(f"设置呼吸灯效失败: {e}") def _set_random_effects(self): """ 设置随机灯效 """ try: logger.info(f"设置所有展品为随机灯效") # 随机选择灯效 random_effects = [ LEDEffectType.COLOR_RANDOM_BREATH.value ] # 获取所有分段 segments = self.controller.wled_state.seg for i, segment in enumerate(segments): # 为每个展品随机选择一种灯效 selected_effect = random.choice(random_effects) segment.set_color(RGBColor.WHITE.rgb_tuple) segment.set_effect(selected_effect, speed=200, intensity=255) segment.on = True logger.info(f"分段 {i} ({segment}) 设置为随机灯效 {selected_effect}") # 发送状态到设备 self.controller.get_wled_state().set_transition_time(10000) success = self.controller.send_state() if success: logger.info("所有展品的随机灯效设置成功") else: logger.error("随机灯效设置失败") except Exception as e: logger.error(f"设置随机灯效失败: {e}") # 全局单例实例 _wled_thread = WledThreadSingleton() def start_exhibit_led_effect(exhibit_id: int) -> bool: """ 启动展品LED灯效控制 这是模块的主要对外接口函数,功能如下: 1. 接收一个展品ID,从0开始 2. 根据展品ID,只有该展品ID的灯效是呼吸灯效,其他灯效都设置为静止 3. 灯的颜色都是白色 4. 当触发10秒后,所有展品都随机选择COLOR_RANDOM_WAVE = 5、COLOR_RANDOM_WINK = 7、COLOR_RANDOM_BREATH = 8三个灯效之一 Args: exhibit_id: 展品ID,从0开始 Returns: bool: 启动是否成功 """ return _wled_thread.set_exhibit_task(exhibit_id) def stop_led_effect(): """ 停止当前LED灯效 Returns: bool: 停止是否成功 """ _wled_thread.stop_effect() return True def is_effect_running() -> bool: """ 检查是否有灯效正在运行 Returns: bool: 是否有灯效正在运行 """ return _wled_thread.is_running