import json from typing import Any, Dict import requests import time import threading import yaml # Add yaml import import os import sys from utils.logger_config import logger class VideoInfo: def __init__(self, name, formula, description, video_duration, video_path, id=0): self.name = name self.formula = formula self.description = description self.video_duration = video_duration self.video_path = video_path self.id = id class KodiClient: def __init__(self, host="localhost", port=8080, username=None, password=None, id=0): self.host = host self.port = port self.id = id self.url = f"http://{host}:{port}/jsonrpc" self.headers = {'Content-Type': 'application/json'} # 非独立的客户端默认播放展品视频,独立情况下可能会播放图片或监控 self.isIndividual = False # 启动kodi的url self.my_app_name = "org.xbmc.kodi" # kodi的包名 self.mitv_startapp_url = f"http://{host}:6095/controller?action=startapp&&type=packagename&packagename={self.my_app_name}" if username and password: self.auth = (username, password) else: self.auth = None self.ready_event = threading.Event() def _send_request(self, method, params=None): data = { "jsonrpc": "2.0", "method": method, "id": 1, } if params: data["params"] = params response = requests.post( self.url, data=json.dumps(data), headers=self.headers, auth=self.auth ) return response.json() def _send_request_async(self, method, params=None): """在后台线程中发送JSON-RPC请求,立即返回,不阻塞当前调用。""" def _worker(): try: resp = self._send_request(method, params) logger.debug(f"[async] {method} 响应: {resp}") except Exception as e: logger.warning(f"[async] {method} 调用异常: {e}") t = threading.Thread(target=_worker, daemon=True) t.start() return True # 播放url图片 def play_url_image(self, image_url): """播放指定url的图片文件(异步派发)。""" params = {"item": {"file": image_url}} self._send_request_async("Player.Open", params) logger.info(f"[async] Player.Open(url image) 已派发: {image_url}") return {"queued": True, "file": image_url} # 播放rtsp视频流 def play_rtsp_video(self, rtsp_url): """播放指定rtsp视频流(异步派发)。""" params = {"item": {"file": rtsp_url}} self._send_request_async("Player.Open", params) logger.info(f"[async] Player.Open(rtsp) 已派发: {rtsp_url}") return {"queued": True, "file": rtsp_url} def play_video(self, video_path, loop=False): """播放指定路径的视频文件(异步派发),loop为是否循环播放""" params = {"item": {"file": video_path}} def _play_video_worker(): """异步工作线程:播放视频并根据loop参数设置循环""" try: response = self._send_request("Player.Open", params) logger.info(f"[async] Player.Open(video) 响应: {response}") if loop: # 如果需要循环播放,等待视频加载后设置循环 if response and response.get('result') == 'OK': logger.info("[async] Player.Open 调用成功,等待视频加载后设置循环...") # 等待视频加载 (根据观察到的 5 秒加载时间设置) wait_time = 5.5 logger.info(f"[async] 等待 {wait_time} 秒让视频加载...") time.sleep(wait_time) # 尝试直接使用 playerid: 1,并验证其类型 player_id_to_try = 1 try: properties_to_get = ["speed", "type"] get_props_params = {"playerid": player_id_to_try, "properties": properties_to_get} props_response = self._send_request("Player.GetProperties", get_props_params) logger.info(f"[async] Player.GetProperties (ID: {player_id_to_try}) 响应: {props_response}") if props_response and props_response.get('result') and props_response['result'].get('type') == 'video': logger.info(f"[async] 播放器 {player_id_to_try} 确认为视频播放器。尝试设置循环...") repeat_params = {"playerid": player_id_to_try, "repeat": "all"} repeat_response = self._send_request("Player.SetRepeat", repeat_params) logger.info(f"[async] Player.SetRepeat 响应: {repeat_response}") if repeat_response and repeat_response.get('result') == 'OK': logger.info("[async] 循环播放设置成功。") else: logger.warning(f"[async] 设置循环播放失败: {repeat_response}") elif props_response and props_response.get('error'): logger.warning(f"[async] 获取播放器 {player_id_to_try} 属性时出错: {props_response['error']} - 可能播放器尚未就绪或 ID 不正确") logger.warning("[async] 无法确认播放器状态,不设置循环。") else: logger.warning(f"[async] 播放器 {player_id_to_try} 不是预期的视频播放器或未找到。类型: {props_response.get('result', {}).get('type')},不设置循环。") except Exception as e: logger.error(f"[async] 在尝试获取属性或设置循环时发生错误: {str(e)}") else: logger.info(f"[async] Player.Open(video) 已派发,不设置循环: {video_path}") except Exception as e: logger.warning(f"[async] Player.Open(video) 调用异常: {e}") t = threading.Thread(target=_play_video_worker, daemon=True) t.start() logger.info(f"[async] Player.Open(video) 已派发: {video_path}, 循环: {loop}") return {"queued": True, "file": video_path, "loop": loop} def play_playlist_looped(self, video_paths): """清空播放列表,添加多个视频,并循环播放(整体异步派发)。""" if not isinstance(video_paths, list) or not video_paths: logger.error("错误:video_paths 必须是一个非空列表。") return None playlist_id = 1 def _playlist_worker(): try: logger.info(f"[async] Playlist.Clear -> {playlist_id}") self._send_request("Playlist.Clear", {"playlistid": playlist_id}) for vp in video_paths: logger.info(f"[async] Playlist.Add -> {vp}") self._send_request("Playlist.Add", {"playlistid": playlist_id, "item": {"file": vp}}) logger.info(f"[async] Player.Open playlist -> position 0") self._send_request("Player.Open", {"item": {"playlistid": playlist_id, "position": 0}}) logger.info(f"[async] Player.SetRepeat(all) -> playerid 1") self._send_request("Player.SetRepeat", {"playerid": 1, "repeat": "all"}) except Exception as e: logger.warning(f"[async] 播放列表派发异常: {e}") threading.Thread(target=_playlist_worker, daemon=True).start() return {"queued": True, "playlist": len(video_paths)} def _get_active_player_id(self): """获取当前活动的播放器ID""" try: response = self._send_request("Player.GetActivePlayers") logger.debug(f"Player.GetActivePlayers 响应: {response}") if response and response.get('result'): players = response['result'] if players: return players[0].get('playerid') logger.warning("未能从响应中找到有效的播放器ID。") return None except Exception as e: logger.error(f"获取活动播放器ID时出错: {str(e)}") return None def stop_playback(self): """停止当前播放(异步派发)。""" # 直接尝试默认播放器1,避免阻塞 self._send_request_async("Player.Stop", {"playerid": 1}) logger.info("[async] Player.Stop 已派发 (playerid=1)") return {"queued": True} def pause_playback(self): """暂停/继续播放(异步派发)。""" self._send_request_async("Player.PlayPause", {"playerid": 1}) logger.info("[async] Player.PlayPause 已派发 (playerid=1)") return {"queued": True} def set_volume(self, volume): """设置Kodi音量 (0-100)""" if not isinstance(volume, int) or not 0 <= volume <= 100: logger.error("错误:音量必须是 0 到 100 之间的整数。") return None params = {"volume": volume} # 异步发送,不阻塞调用方 self._send_request_async("Application.SetVolume", params) logger.info(f"Application.SetVolume ({volume}%) 已异步派发") return {"queued": True, "volume": volume} def get_player_state(self): """获取当前播放器状态(为避免阻塞,改为异步派发查询,并返回占位数据)。""" self._send_request_async('Player.GetActivePlayers') logger.info("[async] Player.GetActivePlayers 已派发(不阻塞)") # 占位返回,避免调用方阻塞;如需真实状态,应改造为回调/轮询机制 return {'queued': True} def set_ready(self): """设置客户端准备就绪""" self.ready_event.set() def wait_for_ready(self, timeout=10): """等待客户端准备就绪""" return self.ready_event.wait(timeout) def set_individual(self, isIndividual=False): """设置客户端是否为独立客户端""" self.isIndividual = isIndividual def get_individual(self): """获取客户端是否为独立客户端""" return self.isIndividual # 启动kodi def start_kodi(self): """启动kodi""" res = requests.get(self.mitv_startapp_url, timeout=3).json() logger.info(f"启动kodi响应: {res}") return res # kodi心跳检测,检查kodi客户端是否在线 def kodi_heartbeat_check(self): """检查kodi客户端是否在线""" try: # 使用JSON-RPC请求进行心跳检测,支持认证 data = { "jsonrpc": "2.0", "method": "JSONRPC.Version", "id": 1 } response = requests.post( self.url, data=json.dumps(data), headers=self.headers, auth=self.auth, timeout=3 ) # 检查HTTP状态码 if response.status_code != 200: logger.warning(f"kodi心跳检测失败: HTTP状态码 {response.status_code}") return False # 检查响应内容是否为空 if not response.text or response.text.strip() == '': logger.warning(f"kodi心跳检测失败: 响应内容为空") return False # 尝试解析JSON try: json_data = response.json() # JSONRPC.Version返回的结果包含version字段表示成功 if json_data.get('result') is not None: return True else: return False except json.JSONDecodeError as json_err: logger.warning(f"kodi心跳检测失败: JSON解析错误 - {json_err}, 响应内容: {response.text[:100]}") return False except requests.exceptions.Timeout: logger.warning(f"kodi心跳检测超时: 连接 {self.url} 超时") return False except requests.exceptions.ConnectionError: logger.warning(f"kodi心跳检测失败: 无法连接到 {self.url}") return False except Exception as e: logger.error(f"kodi心跳检测异常: {e}") return False class KodiClientManager(): def __init__(self): self.kodi_clients = [] self.video_infos = [] # 生产环境配置文件路径 self.kodi_config_path = 'kodi_config_prod.yaml' self.video_config_path = 'video_config_prod.yaml' # 开发环境配置文件路径 # self.kodi_config_path = 'kodi_config_test.yaml' # self.video_config_path = 'video_config_test.yaml' # 只有一台可以有声音,其他没有声音,这是音量 self.volume = 65 self._init_kodi_clients_from_config() self._init_video_infos_from_config() def set_volume(self, volume): # 设置播放视频的音量 self.volume = volume # 立即应用到符合条件的客户端 # 逻辑与 sync_play_video 中一致:第一台非独立设备设置音量,其他静音 # 注意:这里假设 kodi_clients 的顺序即为物理排列顺序 found_first_non_individual = False for client in self.kodi_clients: if client.get_individual(): continue if not found_first_non_individual: client.set_volume(self.volume) found_first_non_individual = True else: client.set_volume(0) def get_volume(self): """获取当前全局音量""" return self.volume def _init_video_infos_from_config(self): config = self._load_config(self.video_config_path) video_infos_config = config.get('video_infos', []) for video_info_config in video_infos_config: video_info = VideoInfo(video_info_config.get('name'), video_info_config.get('formula'), video_info_config.get('description'), video_info_config.get('video_duration'), video_info_config.get('video_path'), video_info_config.get('id')) logger.info(f"成功加载视频信息: {video_info.name}") self.video_infos.append(video_info) def _load_config(self, config_path) -> Dict[str, Any]: """ 从YAML配置文件加载配置 Returns: dict: 配置字典 Raises: FileNotFoundError: 配置文件不存在 yaml.YAMLError: YAML解析错误 Exception: 其他加载错误 """ if not os.path.exists(config_path): error_msg = f"配置文件不存在: {config_path}" logger.error(error_msg) raise FileNotFoundError(error_msg) try: with open(config_path, 'r', encoding='utf-8') as file: config = yaml.safe_load(file) if config is None: error_msg = f"配置文件为空或格式错误: {config_path}" logger.error(error_msg) raise ValueError(error_msg) logger.info(f"成功加载配置文件: {config_path}") return config except yaml.YAMLError as e: error_msg = f"YAML解析错误: {e}" logger.error(error_msg) raise except Exception as e: error_msg = f"加载配置文件失败: {e}" logger.error(error_msg) raise def _init_kodi_clients_from_config(self): config = self._load_config(self.kodi_config_path) kodi_servers_config = config.get('kodi_servers', []) if not kodi_servers_config: logger.error("未在 config.yaml 中找到有效的 Kodi 服务器配置,脚本将退出。") exit() # 创建 Kodi 客户端实例列表 for server_config in kodi_servers_config: client = KodiClient( host=server_config.get('ip', 'localhost'), port=server_config.get('port', 8080), username=server_config.get('username'), password=server_config.get('password'), id=server_config.get('id', 0) ) try: if hasattr(client, 'set_volume') and callable(getattr(client, 'set_volume')): client.set_volume(65) else: # 兼容旧构建:直接通过 JSON-RPC 设置音量 client._send_request("Application.SetVolume", {"volume": 65}) except Exception as e: logger.warning(f"设置音量时出现问题(已忽略以继续):{e}") self.kodi_clients.append(client) def _resolve_config_path(self, filename = "kodi_config.yaml"): """Return absolute path to config file, located next to the script/exe. When bundled with PyInstaller, sys.frozen is True and sys.executable points to the exe. In normal execution, use the directory of this file. """ try: if getattr(sys, "frozen", False): base_dir = os.path.dirname(sys.executable) else: base_dir = os.path.dirname(os.path.abspath(__file__)) return os.path.join(base_dir, filename) except Exception: # Fallback to current working directory return filename def sync_play_video(self, clients, video_path, loop=False): """同步播放视频""" # 创建一个共享的Event来控制所有客户端同时开始播放 start_event = threading.Event() logger.info(f"开始同步播放视频: {video_path}") # 创建播放线程 def play_thread(client): # 等待开始信号 start_event.wait() # 执行播放 result = client.play_video(video_path, loop=loop) logger.info(f"播放视频: {video_path}, 循环: {loop}") logger.info(f"播放视频结果: {result}") # 启动所有播放线程 threads = [] client_index = 0 for client in clients: if client.get_individual(): client_index += 1 continue # 只对第一台设置音量 if client == clients[client_index]: client.set_volume(self.volume) else: client.set_volume(0) thread = threading.Thread(target=play_thread, args=(client,)) threads.append(thread) thread.start() # 等待所有线程准备就绪 time.sleep(0.1) # 给线程一点时间启动 # 同时触发所有客户端开始播放 start_event.set() # 等待所有线程完成 for thread in threads: thread.join() # 指定某台播放url图片 def play_url_image_on_client(self, client, image_url): """指定某台播放url图片""" client.isIndividual = True return client.play_url_image(image_url) # 指定某台播放rtsp视频流 def play_rtsp_video_on_client(self, client, rtsp_url,volume=0): """指定某台播放rtsp视频流""" client.isIndividual = True client.set_volume(volume) return client.play_rtsp_video(rtsp_url) # 撤销所有客户端的独立状态 def revoke_individual_state(self): """撤销所有客户端的独立状态""" for client in self.kodi_clients: client.isIndividual = False client.stop_playback() # 启动所有kodi应用程序 def start_all_kodi_apps(self): """启动所有kodi应用程序""" for client in self.kodi_clients: client.start_kodi() # 检查所有kodi客户端是否在线,返回所有不在线的client_index集合 def check_all_kodi_clients_online(self): """检查所有kodi客户端是否在线,返回所有不在线的客户端索引集合""" offline_indices = set() client_index = 0 for client in self.kodi_clients: if not client.kodi_heartbeat_check(): offline_indices.add(client_index) client_index += 1 return offline_indices