| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477 |
- import json
- from typing import Any, Dict
- import requests
- import time
- import threading
- import yaml # Add yaml import
- import os
- import sys
- from utils.logger_config import logger
- class VideoInfo:
- def __init__(self, name, formula, description, video_duration, video_path, id=0):
- self.name = name
- self.formula = formula
- self.description = description
- self.video_duration = video_duration
- self.video_path = video_path
- self.id = id
- class KodiClient:
- def __init__(self, host="localhost", port=8080, username=None, password=None, id=0):
- self.host = host
- self.port = port
- self.id = id
- self.url = f"http://{host}:{port}/jsonrpc"
- self.headers = {'Content-Type': 'application/json'}
- # 非独立的客户端默认播放展品视频,独立情况下可能会播放图片或监控
- self.isIndividual = False
- # 启动kodi的url
- self.my_app_name = "org.xbmc.kodi" # kodi的包名
- self.mitv_startapp_url = f"http://{host}:6095/controller?action=startapp&&type=packagename&packagename={self.my_app_name}"
-
- if username and password:
- self.auth = (username, password)
- else:
- self.auth = None
- self.ready_event = threading.Event()
- def _send_request(self, method, params=None):
- data = {
- "jsonrpc": "2.0",
- "method": method,
- "id": 1,
- }
- if params:
- data["params"] = params
- response = requests.post(
- self.url,
- data=json.dumps(data),
- headers=self.headers,
- auth=self.auth
- )
- return response.json()
- def _send_request_async(self, method, params=None):
- """在后台线程中发送JSON-RPC请求,立即返回,不阻塞当前调用。"""
- def _worker():
- try:
- resp = self._send_request(method, params)
- logger.debug(f"[async] {method} 响应: {resp}")
- except Exception as e:
- logger.warning(f"[async] {method} 调用异常: {e}")
- t = threading.Thread(target=_worker, daemon=True)
- t.start()
- return True
- # 播放url图片
- def play_url_image(self, image_url):
- """播放指定url的图片文件(异步派发)。"""
- params = {"item": {"file": image_url}}
- self._send_request_async("Player.Open", params)
- logger.info(f"[async] Player.Open(url image) 已派发: {image_url}")
- return {"queued": True, "file": image_url}
- # 播放rtsp视频流
- def play_rtsp_video(self, rtsp_url):
- """播放指定rtsp视频流(异步派发)。"""
- params = {"item": {"file": rtsp_url}}
- self._send_request_async("Player.Open", params)
- logger.info(f"[async] Player.Open(rtsp) 已派发: {rtsp_url}")
- return {"queued": True, "file": rtsp_url}
- def play_video(self, video_path, loop=False):
- """播放指定路径的视频文件(异步派发),loop为是否循环播放"""
- params = {"item": {"file": video_path}}
-
- def _play_video_worker():
- """异步工作线程:播放视频并根据loop参数设置循环"""
- try:
- response = self._send_request("Player.Open", params)
- logger.info(f"[async] Player.Open(video) 响应: {response}")
-
- if loop:
- # 如果需要循环播放,等待视频加载后设置循环
- if response and response.get('result') == 'OK':
- logger.info("[async] Player.Open 调用成功,等待视频加载后设置循环...")
- # 等待视频加载 (根据观察到的 5 秒加载时间设置)
- wait_time = 5.5
- logger.info(f"[async] 等待 {wait_time} 秒让视频加载...")
- time.sleep(wait_time)
- # 尝试直接使用 playerid: 1,并验证其类型
- player_id_to_try = 1
- try:
- properties_to_get = ["speed", "type"]
- get_props_params = {"playerid": player_id_to_try, "properties": properties_to_get}
- props_response = self._send_request("Player.GetProperties", get_props_params)
- logger.info(f"[async] Player.GetProperties (ID: {player_id_to_try}) 响应: {props_response}")
- if props_response and props_response.get('result') and props_response['result'].get('type') == 'video':
- logger.info(f"[async] 播放器 {player_id_to_try} 确认为视频播放器。尝试设置循环...")
- repeat_params = {"playerid": player_id_to_try, "repeat": "all"}
- repeat_response = self._send_request("Player.SetRepeat", repeat_params)
- logger.info(f"[async] Player.SetRepeat 响应: {repeat_response}")
- if repeat_response and repeat_response.get('result') == 'OK':
- logger.info("[async] 循环播放设置成功。")
- else:
- logger.warning(f"[async] 设置循环播放失败: {repeat_response}")
- elif props_response and props_response.get('error'):
- logger.warning(f"[async] 获取播放器 {player_id_to_try} 属性时出错: {props_response['error']} - 可能播放器尚未就绪或 ID 不正确")
- logger.warning("[async] 无法确认播放器状态,不设置循环。")
- else:
- logger.warning(f"[async] 播放器 {player_id_to_try} 不是预期的视频播放器或未找到。类型: {props_response.get('result', {}).get('type')},不设置循环。")
- except Exception as e:
- logger.error(f"[async] 在尝试获取属性或设置循环时发生错误: {str(e)}")
- else:
- logger.info(f"[async] Player.Open(video) 已派发,不设置循环: {video_path}")
- except Exception as e:
- logger.warning(f"[async] Player.Open(video) 调用异常: {e}")
-
- t = threading.Thread(target=_play_video_worker, daemon=True)
- t.start()
- logger.info(f"[async] Player.Open(video) 已派发: {video_path}, 循环: {loop}")
- return {"queued": True, "file": video_path, "loop": loop}
- def play_playlist_looped(self, video_paths):
- """清空播放列表,添加多个视频,并循环播放(整体异步派发)。"""
- if not isinstance(video_paths, list) or not video_paths:
- logger.error("错误:video_paths 必须是一个非空列表。")
- return None
- playlist_id = 1
- def _playlist_worker():
- try:
- logger.info(f"[async] Playlist.Clear -> {playlist_id}")
- self._send_request("Playlist.Clear", {"playlistid": playlist_id})
- for vp in video_paths:
- logger.info(f"[async] Playlist.Add -> {vp}")
- self._send_request("Playlist.Add", {"playlistid": playlist_id, "item": {"file": vp}})
- logger.info(f"[async] Player.Open playlist -> position 0")
- self._send_request("Player.Open", {"item": {"playlistid": playlist_id, "position": 0}})
- logger.info(f"[async] Player.SetRepeat(all) -> playerid 1")
- self._send_request("Player.SetRepeat", {"playerid": 1, "repeat": "all"})
- except Exception as e:
- logger.warning(f"[async] 播放列表派发异常: {e}")
- threading.Thread(target=_playlist_worker, daemon=True).start()
- return {"queued": True, "playlist": len(video_paths)}
- def _get_active_player_id(self):
- """获取当前活动的播放器ID"""
- try:
- response = self._send_request("Player.GetActivePlayers")
- logger.debug(f"Player.GetActivePlayers 响应: {response}")
- if response and response.get('result'):
- players = response['result']
- if players:
- return players[0].get('playerid')
- logger.warning("未能从响应中找到有效的播放器ID。")
- return None
- except Exception as e:
- logger.error(f"获取活动播放器ID时出错: {str(e)}")
- return None
- def stop_playback(self):
- """停止当前播放(异步派发)。"""
- # 直接尝试默认播放器1,避免阻塞
- self._send_request_async("Player.Stop", {"playerid": 1})
- logger.info("[async] Player.Stop 已派发 (playerid=1)")
- return {"queued": True}
- def pause_playback(self):
- """暂停/继续播放(异步派发)。"""
- self._send_request_async("Player.PlayPause", {"playerid": 1})
- logger.info("[async] Player.PlayPause 已派发 (playerid=1)")
- return {"queued": True}
- def set_volume(self, volume):
- """设置Kodi音量 (0-100)"""
- if not isinstance(volume, int) or not 0 <= volume <= 100:
- logger.error("错误:音量必须是 0 到 100 之间的整数。")
- return None
- params = {"volume": volume}
- # 异步发送,不阻塞调用方
- self._send_request_async("Application.SetVolume", params)
- logger.info(f"Application.SetVolume ({volume}%) 已异步派发")
- return {"queued": True, "volume": volume}
- def get_player_state(self):
- """获取当前播放器状态(为避免阻塞,改为异步派发查询,并返回占位数据)。"""
- self._send_request_async('Player.GetActivePlayers')
- logger.info("[async] Player.GetActivePlayers 已派发(不阻塞)")
- # 占位返回,避免调用方阻塞;如需真实状态,应改造为回调/轮询机制
- return {'queued': True}
- def set_ready(self):
- """设置客户端准备就绪"""
- self.ready_event.set()
- def wait_for_ready(self, timeout=10):
- """等待客户端准备就绪"""
- return self.ready_event.wait(timeout)
- def set_individual(self, isIndividual=False):
- """设置客户端是否为独立客户端"""
- self.isIndividual = isIndividual
- def get_individual(self):
- """获取客户端是否为独立客户端"""
- return self.isIndividual
- # 启动kodi
- def start_kodi(self):
- """启动kodi"""
- res = requests.get(self.mitv_startapp_url, timeout=3).json()
- logger.info(f"启动kodi响应: {res}")
- return res
- # kodi心跳检测,检查kodi客户端是否在线
- def kodi_heartbeat_check(self):
- """检查kodi客户端是否在线"""
- try:
- # 使用JSON-RPC请求进行心跳检测,支持认证
- data = {
- "jsonrpc": "2.0",
- "method": "JSONRPC.Version",
- "id": 1
- }
- response = requests.post(
- self.url,
- data=json.dumps(data),
- headers=self.headers,
- auth=self.auth,
- timeout=3
- )
- # 检查HTTP状态码
- if response.status_code != 200:
- logger.warning(f"kodi心跳检测失败: HTTP状态码 {response.status_code}")
- return False
- # 检查响应内容是否为空
- if not response.text or response.text.strip() == '':
- logger.warning(f"kodi心跳检测失败: 响应内容为空")
- return False
- # 尝试解析JSON
- try:
- json_data = response.json()
- # JSONRPC.Version返回的结果包含version字段表示成功
- if json_data.get('result') is not None:
- return True
- else:
- return False
- except json.JSONDecodeError as json_err:
- logger.warning(f"kodi心跳检测失败: JSON解析错误 - {json_err}, 响应内容: {response.text[:100]}")
- return False
- except requests.exceptions.Timeout:
- logger.warning(f"kodi心跳检测超时: 连接 {self.url} 超时")
- return False
- except requests.exceptions.ConnectionError:
- logger.warning(f"kodi心跳检测失败: 无法连接到 {self.url}")
- return False
- except Exception as e:
- logger.error(f"kodi心跳检测异常: {e}")
- return False
- class KodiClientManager():
- def __init__(self):
- self.kodi_clients = []
- self.video_infos = []
- # 生产环境配置文件路径
- self.kodi_config_path = 'kodi_config_prod.yaml'
- self.video_config_path = 'video_config_prod.yaml'
- # 开发环境配置文件路径
- # self.kodi_config_path = 'kodi_config_test.yaml'
- # self.video_config_path = 'video_config_test.yaml'
- # 只有一台可以有声音,其他没有声音,这是音量
- self.volume = 65
- self._init_kodi_clients_from_config()
- self._init_video_infos_from_config()
- def set_volume(self, volume):
- # 设置播放视频的音量
- self.volume = volume
-
- # 立即应用到符合条件的客户端
- # 逻辑与 sync_play_video 中一致:第一台非独立设备设置音量,其他静音
- # 注意:这里假设 kodi_clients 的顺序即为物理排列顺序
- found_first_non_individual = False
-
- for client in self.kodi_clients:
- if client.get_individual():
- continue
-
- if not found_first_non_individual:
- client.set_volume(self.volume)
- found_first_non_individual = True
- else:
- client.set_volume(0)
- def get_volume(self):
- """获取当前全局音量"""
- return self.volume
-
- def _init_video_infos_from_config(self):
- config = self._load_config(self.video_config_path)
- video_infos_config = config.get('video_infos', [])
- for video_info_config in video_infos_config:
- video_info = VideoInfo(video_info_config.get('name'), video_info_config.get('formula'), video_info_config.get('description'), video_info_config.get('video_duration'), video_info_config.get('video_path'), video_info_config.get('id'))
- logger.info(f"成功加载视频信息: {video_info.name}")
- self.video_infos.append(video_info)
- def _load_config(self, config_path) -> Dict[str, Any]:
- """
- 从YAML配置文件加载配置
-
- Returns:
- dict: 配置字典
-
- Raises:
- FileNotFoundError: 配置文件不存在
- yaml.YAMLError: YAML解析错误
- Exception: 其他加载错误
- """
- if not os.path.exists(config_path):
- error_msg = f"配置文件不存在: {config_path}"
- logger.error(error_msg)
- raise FileNotFoundError(error_msg)
-
- try:
- with open(config_path, 'r', encoding='utf-8') as file:
- config = yaml.safe_load(file)
- if config is None:
- error_msg = f"配置文件为空或格式错误: {config_path}"
- logger.error(error_msg)
- raise ValueError(error_msg)
- logger.info(f"成功加载配置文件: {config_path}")
- return config
- except yaml.YAMLError as e:
- error_msg = f"YAML解析错误: {e}"
- logger.error(error_msg)
- raise
- except Exception as e:
- error_msg = f"加载配置文件失败: {e}"
- logger.error(error_msg)
- raise
- def _init_kodi_clients_from_config(self):
- config = self._load_config(self.kodi_config_path)
- kodi_servers_config = config.get('kodi_servers', [])
- if not kodi_servers_config:
- logger.error("未在 config.yaml 中找到有效的 Kodi 服务器配置,脚本将退出。")
- exit()
- # 创建 Kodi 客户端实例列表
-
- for server_config in kodi_servers_config:
- client = KodiClient(
- host=server_config.get('ip', 'localhost'),
- port=server_config.get('port', 8080),
- username=server_config.get('username'),
- password=server_config.get('password'),
- id=server_config.get('id', 0)
- )
- try:
- if hasattr(client, 'set_volume') and callable(getattr(client, 'set_volume')):
- client.set_volume(65)
- else:
- # 兼容旧构建:直接通过 JSON-RPC 设置音量
- client._send_request("Application.SetVolume", {"volume": 65})
- except Exception as e:
- logger.warning(f"设置音量时出现问题(已忽略以继续):{e}")
- self.kodi_clients.append(client)
- def _resolve_config_path(self, filename = "kodi_config.yaml"):
- """Return absolute path to config file, located next to the script/exe.
- When bundled with PyInstaller, sys.frozen is True and sys.executable points to the exe.
- In normal execution, use the directory of this file.
- """
- try:
- if getattr(sys, "frozen", False):
- base_dir = os.path.dirname(sys.executable)
- else:
- base_dir = os.path.dirname(os.path.abspath(__file__))
- return os.path.join(base_dir, filename)
- except Exception:
- # Fallback to current working directory
- return filename
- def sync_play_video(self, clients, video_path, loop=False):
- """同步播放视频"""
- # 创建一个共享的Event来控制所有客户端同时开始播放
- start_event = threading.Event()
- logger.info(f"开始同步播放视频: {video_path}")
-
- # 创建播放线程
- def play_thread(client):
- # 等待开始信号
- start_event.wait()
- # 执行播放
- result = client.play_video(video_path, loop=loop)
- logger.info(f"播放视频: {video_path}, 循环: {loop}")
- logger.info(f"播放视频结果: {result}")
- # 启动所有播放线程
- threads = []
- client_index = 0
- for client in clients:
-
- if client.get_individual():
- client_index += 1
- continue
- # 只对第一台设置音量
- if client == clients[client_index]:
- client.set_volume(self.volume)
- else:
- client.set_volume(0)
- thread = threading.Thread(target=play_thread, args=(client,))
- threads.append(thread)
- thread.start()
- # 等待所有线程准备就绪
- time.sleep(0.1) # 给线程一点时间启动
-
- # 同时触发所有客户端开始播放
- start_event.set()
- # 等待所有线程完成
- for thread in threads:
- thread.join()
- # 指定某台播放url图片
- def play_url_image_on_client(self, client, image_url):
- """指定某台播放url图片"""
- client.isIndividual = True
- return client.play_url_image(image_url)
- # 指定某台播放rtsp视频流
- def play_rtsp_video_on_client(self, client, rtsp_url,volume=0):
- """指定某台播放rtsp视频流"""
- client.isIndividual = True
- client.set_volume(volume)
- return client.play_rtsp_video(rtsp_url)
- # 撤销所有客户端的独立状态
- def revoke_individual_state(self):
- """撤销所有客户端的独立状态"""
- for client in self.kodi_clients:
- client.isIndividual = False
- client.stop_playback()
- # 启动所有kodi应用程序
- def start_all_kodi_apps(self):
- """启动所有kodi应用程序"""
- for client in self.kodi_clients:
- client.start_kodi()
- # 检查所有kodi客户端是否在线,返回所有不在线的client_index集合
- def check_all_kodi_clients_online(self):
- """检查所有kodi客户端是否在线,返回所有不在线的客户端索引集合"""
- offline_indices = set()
- client_index = 0
- for client in self.kodi_clients:
- if not client.kodi_heartbeat_check():
- offline_indices.add(client_index)
- client_index += 1
- return offline_indices
|