| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- # 这个模块是负责控制wled灯带的线程
- # 只有一个对外开放的函数,这个函数方法描述如下
- # 1.接收一个展品ID,0开始
- # 2.根据展品ID,只有哪个展品ID的灯效是呼吸灯效,其他灯效都设置为静止
- # 3.灯的颜色都是白色
- # 4.当触发10s后,所有展品都随机选择COLOR_RANDOM_WAVE = 5、COLOR_RANDOM_WINK = 7、COLOR_RANDOM_BREATH = 8三个灯效之一
- import threading
- import time
- import random
- from typing import Optional
- from hardware.wled_controller_module import WledController
- from hardware.wled_enum import LEDEffectType, RGBColor
- from utils.logger_config import logger
- class WledThreadSingleton:
- """WLED线程单例类,线程一直运行,通过修改内部变量控制播放"""
-
- _instance = None
- _lock = threading.Lock()
-
- def __new__(cls):
- if cls._instance is None:
- with cls._lock:
- if cls._instance is None:
- cls._instance = super(WledThreadSingleton, cls).__new__(cls)
- return cls._instance
-
- def __init__(self):
- # 避免重复初始化
- if hasattr(self, '_initialized'):
- return
-
- self._initialized = True
- self.controller = None
- self.thread = None
- self.is_running = False
-
- # 线程控制变量(需要加锁)
- self._data_lock = threading.Lock()
- self._current_exhibit_id = 0
- self._is_new_task = False
- self._should_stop = False
-
- # 启动工作线程
- self._start_worker_thread()
-
- def _start_worker_thread(self):
- """启动工作线程"""
- if self.thread is None or not self.thread.is_alive():
- self.is_running = True
- self.thread = threading.Thread(target=self._worker_loop, daemon=True)
- self.thread.start()
- logger.info("WLED工作线程已启动")
-
- def _initialize_controller(self):
- """初始化WLED控制器"""
- try:
- if self.controller is None:
- self.controller = WledController()
- logger.info("WLED控制器初始化成功")
- except Exception as e:
- logger.error(f"WLED控制器初始化失败: {e}")
- raise
-
- def _worker_loop(self):
- """工作线程主循环"""
- logger.info("WLED工作线程开始运行")
-
- try:
- # 初始化控制器
- self._initialize_controller()
-
- # 状态变量
- current_phase = "idle" # idle, breathing, random
- phase_start_time = 0
- breathing_duration = 10 # 呼吸灯效持续时间(秒)
-
- while self.is_running and not self._should_stop:
- try:
- # 检查是否有新任务
- with self._data_lock:
- is_new_task = self._is_new_task
- exhibit_id = self._current_exhibit_id
- if is_new_task:
- self._is_new_task = False
-
- # 处理新任务
- if is_new_task:
- logger.info(f"收到新任务,展品ID: {exhibit_id}")
- current_phase = "breathing"
- phase_start_time = time.time()
- self._set_breathing_effect(exhibit_id)
-
- # 检查阶段切换
- if current_phase == "breathing":
- elapsed_time = time.time() - phase_start_time
- if elapsed_time >= breathing_duration:
- logger.info("切换到随机灯效阶段")
- current_phase = "random"
- self._set_random_effects()
-
- # 短暂休眠,避免CPU占用过高
- time.sleep(0.1)
-
- except Exception as e:
- logger.error(f"工作线程循环异常: {e}")
- time.sleep(1)
-
- except Exception as e:
- logger.error(f"工作线程异常: {e}")
- finally:
- logger.info("WLED工作线程结束")
-
- def set_exhibit_task(self, exhibit_id: int) -> bool:
- """
- 设置新的展品任务
-
- Args:
- exhibit_id: 展品ID,从0开始
-
- Returns:
- bool: 设置是否成功
- """
- try:
- # 验证展品ID
- if not isinstance(exhibit_id, int) or exhibit_id < 0:
- logger.error(f"无效的展品ID: {exhibit_id}")
- return False
-
- # 检查展品ID是否在有效范围内
- if self.controller is None:
- self._initialize_controller()
-
- segments_count = self.controller.wled_state.get_segments_count()
- if exhibit_id >= segments_count:
- logger.error(f"展品ID {exhibit_id} 超出范围,最大ID为 {segments_count - 1}")
- return False
-
- # 设置新任务
- with self._data_lock:
- self._current_exhibit_id = exhibit_id
- self._is_new_task = True
-
- logger.info(f"设置新任务,展品ID: {exhibit_id}")
- return True
-
- except Exception as e:
- logger.error(f"设置展品任务失败: {e}")
- return False
-
- def stop_effect(self):
- """停止当前灯效"""
- with self._data_lock:
- self._should_stop = True
- logger.info("停止灯效控制")
-
- def _set_breathing_effect(self, exhibit_id: int):
- """
- 设置呼吸灯效
-
- Args:
- exhibit_id: 展品ID
- """
- try:
- logger.info(f"设置展品 {exhibit_id} 为呼吸灯效")
-
- # 获取所有分段
- segments = self.controller.wled_state.seg
-
- for i, segment in enumerate(segments):
- if i == exhibit_id:
- # 目标展品:设置为呼吸灯效,白色
- segment.set_color(RGBColor.WHITE.rgb_tuple)
- segment.set_effect(LEDEffectType.BREATH.value, speed=200, intensity=255)
- segment.on = True
- logger.info(f"分段 {i} ({segment}) 设置为呼吸灯效")
- else:
- # 其他展品:设置为静止,白色
- segment.set_color(RGBColor.WHITE.rgb_tuple)
- segment.set_effect(LEDEffectType.SOLID.value, speed=0, intensity=0)
- segment.on = True
- logger.info(f"分段 {i} ({segment}) 设置为静止")
-
- # 发送状态到设备
- success = self.controller.send_state()
- if success:
- logger.info("呼吸灯效设置成功")
- else:
- logger.error("呼吸灯效设置失败")
-
- except Exception as e:
- logger.error(f"设置呼吸灯效失败: {e}")
-
- def _set_random_effects(self):
- """
- 设置随机灯效
- """
- try:
- logger.info(f"设置所有展品为随机灯效")
-
- # 随机选择灯效
- random_effects = [
- LEDEffectType.COLOR_RANDOM_BREATH.value
- ]
-
- # 获取所有分段
- segments = self.controller.wled_state.seg
-
- for i, segment in enumerate(segments):
- # 为每个展品随机选择一种灯效
- selected_effect = random.choice(random_effects)
- segment.set_color(RGBColor.WHITE.rgb_tuple)
- segment.set_effect(selected_effect, speed=200, intensity=255)
- segment.on = True
- logger.info(f"分段 {i} ({segment}) 设置为随机灯效 {selected_effect}")
-
- # 发送状态到设备
- self.controller.get_wled_state().set_transition_time(10000)
- success = self.controller.send_state()
- if success:
- logger.info("所有展品的随机灯效设置成功")
- else:
- logger.error("随机灯效设置失败")
-
- except Exception as e:
- logger.error(f"设置随机灯效失败: {e}")
- # 全局单例实例
- _wled_thread = WledThreadSingleton()
- def start_exhibit_led_effect(exhibit_id: int) -> bool:
- """
- 启动展品LED灯效控制
-
- 这是模块的主要对外接口函数,功能如下:
- 1. 接收一个展品ID,从0开始
- 2. 根据展品ID,只有该展品ID的灯效是呼吸灯效,其他灯效都设置为静止
- 3. 灯的颜色都是白色
- 4. 当触发10秒后,所有展品都随机选择COLOR_RANDOM_WAVE = 5、COLOR_RANDOM_WINK = 7、COLOR_RANDOM_BREATH = 8三个灯效之一
-
- Args:
- exhibit_id: 展品ID,从0开始
-
- Returns:
- bool: 启动是否成功
- """
- return _wled_thread.set_exhibit_task(exhibit_id)
- def stop_led_effect():
- """
- 停止当前LED灯效
-
- Returns:
- bool: 停止是否成功
- """
- _wled_thread.stop_effect()
- return True
- def is_effect_running() -> bool:
- """
- 检查是否有灯效正在运行
-
- Returns:
- bool: 是否有灯效正在运行
- """
- return _wled_thread.is_running
|