|
|
@@ -21,7 +21,12 @@ class ConnectionManager:
|
|
|
self._pubsub = None
|
|
|
# 异步 Redis 客户端(用于订阅)
|
|
|
self._async_redis = None
|
|
|
-
|
|
|
+
|
|
|
+ def _worker_stats(self) -> tuple:
|
|
|
+ """本 worker 上 WS 连接总数、有连接的用户数。"""
|
|
|
+ total_ws = sum(len(conns) for conns in self.active_connections.values())
|
|
|
+ return total_ws, len(self.active_connections)
|
|
|
+
|
|
|
async def _start_subscriber(self):
|
|
|
"""启动 Redis 订阅者,监听消息频道"""
|
|
|
try:
|
|
|
@@ -43,7 +48,7 @@ class ConnectionManager:
|
|
|
# 订阅个人消息频道和广播频道
|
|
|
await self._pubsub.subscribe("ws:personal", "ws:broadcast")
|
|
|
|
|
|
- logger.info(f"WebSocket Redis subscriber started (instance: {self.instance_id})")
|
|
|
+ logger.info(f"WebSocket Redis 订阅已启动(实例: {self.instance_id})")
|
|
|
|
|
|
# 持续监听消息
|
|
|
async for message in self._pubsub.listen():
|
|
|
@@ -63,16 +68,15 @@ class ConnectionManager:
|
|
|
worker_pid = os.getpid()
|
|
|
local_connections_count = len(self.active_connections.get(user_id, []))
|
|
|
logger.debug(
|
|
|
- f"Received Redis message for user {user_id}. "
|
|
|
- f"Worker PID: {worker_pid}, Instance ID: {self.instance_id}, "
|
|
|
- f"Sender Instance: {sender_instance}, "
|
|
|
- f"Local connections: {local_connections_count}"
|
|
|
+ f"收到 Redis 用户消息 user_id={user_id},"
|
|
|
+ f"进程 PID={worker_pid},本实例={self.instance_id},"
|
|
|
+ f"发送方实例={sender_instance},本地连接数={local_connections_count}"
|
|
|
)
|
|
|
await self._send_to_local_connections(user_id, msg_data)
|
|
|
elif sender_instance == self.instance_id:
|
|
|
logger.debug(
|
|
|
- f"Ignored own message for user {user_id} "
|
|
|
- f"(Worker PID: {os.getpid()}, Instance ID: {self.instance_id})"
|
|
|
+ f"忽略本实例发出的用户消息 user_id={user_id} "
|
|
|
+ f"(PID={os.getpid()},实例={self.instance_id})"
|
|
|
)
|
|
|
|
|
|
elif channel == 'ws:broadcast':
|
|
|
@@ -85,23 +89,22 @@ class ConnectionManager:
|
|
|
worker_pid = os.getpid()
|
|
|
total_connections = sum(len(conns) for conns in self.active_connections.values())
|
|
|
logger.debug(
|
|
|
- f"Received Redis broadcast message. "
|
|
|
- f"Worker PID: {worker_pid}, Instance ID: {self.instance_id}, "
|
|
|
- f"Sender Instance: {sender_instance}, "
|
|
|
- f"Total local connections: {total_connections}"
|
|
|
+ f"收到 Redis 广播消息,"
|
|
|
+ f"进程 PID={worker_pid},本实例={self.instance_id},"
|
|
|
+ f"发送方实例={sender_instance},本地 WS 总数={total_connections}"
|
|
|
)
|
|
|
await self._broadcast_to_local_connections(msg_data)
|
|
|
else:
|
|
|
logger.debug(
|
|
|
- f"Ignored own broadcast message "
|
|
|
- f"(Worker PID: {os.getpid()}, Instance ID: {self.instance_id})"
|
|
|
+ f"忽略本实例发出的广播消息 "
|
|
|
+ f"(PID={os.getpid()},实例={self.instance_id})"
|
|
|
)
|
|
|
|
|
|
except Exception as e:
|
|
|
- logger.error(f"Error processing Redis message: {e}")
|
|
|
+ logger.error(f"处理 Redis 消息失败: {e}")
|
|
|
|
|
|
except Exception as e:
|
|
|
- logger.error(f"Redis subscriber error: {e}")
|
|
|
+ logger.error(f"Redis 订阅异常: {e}")
|
|
|
# 如果订阅失败,尝试重新连接
|
|
|
await asyncio.sleep(5)
|
|
|
if self._pubsub:
|
|
|
@@ -115,7 +118,7 @@ class ConnectionManager:
|
|
|
try:
|
|
|
await connection.send_json(message)
|
|
|
except Exception as e:
|
|
|
- logger.error(f"Error sending message to user {user_id}: {e}")
|
|
|
+ logger.error(f"向用户推送消息失败 user_id={user_id}: {e}")
|
|
|
# 连接可能已断开,从列表中移除
|
|
|
if connection in self.active_connections[user_id]:
|
|
|
self.active_connections[user_id].remove(connection)
|
|
|
@@ -130,7 +133,7 @@ class ConnectionManager:
|
|
|
try:
|
|
|
await connection.send_text(message)
|
|
|
except Exception as e:
|
|
|
- logger.error(f"Error broadcasting message: {e}")
|
|
|
+ logger.error(f"本地广播消息失败: {e}")
|
|
|
|
|
|
async def connect(self, websocket: WebSocket, user_id: int):
|
|
|
await websocket.accept()
|
|
|
@@ -140,11 +143,13 @@ class ConnectionManager:
|
|
|
|
|
|
# 增强日志:记录 worker 信息
|
|
|
worker_pid = os.getpid()
|
|
|
+ total_ws, users_on_worker = self._worker_stats()
|
|
|
logger.info(
|
|
|
- f"User {user_id} connected via WebSocket. "
|
|
|
- f"Worker PID: {worker_pid}, Instance ID: {self.instance_id}, "
|
|
|
- f"Total connections for this user: {len(self.active_connections[user_id])}, "
|
|
|
- f"Total users on this worker: {len(self.active_connections)}"
|
|
|
+ f"用户 WebSocket 已连接 user_id={user_id},"
|
|
|
+ f"进程 PID={worker_pid},实例={self.instance_id},"
|
|
|
+ f"该用户在本 worker 连接数={len(self.active_connections[user_id])},"
|
|
|
+ f"本 worker WS 总数={total_ws},"
|
|
|
+ f"本 worker 有连接的用户数={users_on_worker}"
|
|
|
)
|
|
|
|
|
|
# 首次连接时启动订阅者(如果还没启动)
|
|
|
@@ -153,8 +158,7 @@ class ConnectionManager:
|
|
|
|
|
|
def disconnect(self, websocket: WebSocket, user_id: int):
|
|
|
worker_pid = os.getpid()
|
|
|
- was_connected = user_id in self.active_connections
|
|
|
-
|
|
|
+
|
|
|
if user_id in self.active_connections:
|
|
|
if websocket in self.active_connections[user_id]:
|
|
|
self.active_connections[user_id].remove(websocket)
|
|
|
@@ -163,11 +167,13 @@ class ConnectionManager:
|
|
|
|
|
|
# 增强日志:记录 worker 信息和断开后的状态
|
|
|
remaining_connections = len(self.active_connections.get(user_id, []))
|
|
|
+ total_ws, users_on_worker = self._worker_stats()
|
|
|
logger.info(
|
|
|
- f"User {user_id} disconnected from WebSocket. "
|
|
|
- f"Worker PID: {worker_pid}, Instance ID: {self.instance_id}, "
|
|
|
- f"Remaining connections for this user: {remaining_connections}, "
|
|
|
- f"Total users on this worker: {len(self.active_connections)}"
|
|
|
+ f"用户 WebSocket 已断开 user_id={user_id},"
|
|
|
+ f"进程 PID={worker_pid},实例={self.instance_id},"
|
|
|
+ f"该用户在本 worker 剩余连接数={remaining_connections},"
|
|
|
+ f"本 worker WS 总数={total_ws},"
|
|
|
+ f"本 worker 有连接的用户数={users_on_worker}"
|
|
|
)
|
|
|
|
|
|
async def broadcast(self, message: str):
|
|
|
@@ -183,7 +189,7 @@ class ConnectionManager:
|
|
|
}
|
|
|
redis_client.publish("ws:broadcast", json.dumps(payload))
|
|
|
except Exception as e:
|
|
|
- logger.error(f"Error publishing broadcast to Redis: {e}")
|
|
|
+ logger.error(f"Redis 发布广播失败: {e}")
|
|
|
|
|
|
async def send_personal_message(self, message: dict, user_id: int):
|
|
|
"""
|
|
|
@@ -207,18 +213,17 @@ class ConnectionManager:
|
|
|
|
|
|
# 记录发送日志
|
|
|
logger.debug(
|
|
|
- f"Sending message to user {user_id}. "
|
|
|
- f"Worker PID: {worker_pid}, Instance ID: {self.instance_id}, "
|
|
|
- f"Local connection: {local_has_connection}, "
|
|
|
- f"Local connections count: {local_connections_count}, "
|
|
|
- f"Published to Redis: ws:personal"
|
|
|
+ f"向用户推送(含 Redis 发布)user_id={user_id},"
|
|
|
+ f"PID={worker_pid},实例={self.instance_id},"
|
|
|
+ f"本地是否有连接={local_has_connection},本地连接数={local_connections_count},"
|
|
|
+ f"频道=ws:personal"
|
|
|
)
|
|
|
except Exception as e:
|
|
|
- logger.error(f"Error publishing message to Redis for user {user_id}: {e}")
|
|
|
+ logger.error(f"Redis 发布用户消息失败 user_id={user_id}: {e}")
|
|
|
|
|
|
async def shutdown(self):
|
|
|
"""关闭订阅者和连接"""
|
|
|
- logger.info(f"Shutting down WebSocket manager (instance: {self.instance_id})...")
|
|
|
+ logger.info(f"正在关闭 WebSocket 管理器(实例: {self.instance_id})…")
|
|
|
|
|
|
# 取消订阅任务
|
|
|
if self._subscriber_task and not self._subscriber_task.done():
|
|
|
@@ -234,15 +239,15 @@ class ConnectionManager:
|
|
|
await self._pubsub.unsubscribe()
|
|
|
await self._pubsub.close()
|
|
|
except Exception as e:
|
|
|
- logger.error(f"Error closing pubsub: {e}")
|
|
|
+ logger.error(f"关闭 Pub/Sub 失败: {e}")
|
|
|
|
|
|
# 关闭异步 Redis 客户端
|
|
|
if self._async_redis:
|
|
|
try:
|
|
|
await self._async_redis.close()
|
|
|
except Exception as e:
|
|
|
- logger.error(f"Error closing async redis: {e}")
|
|
|
+ logger.error(f"关闭异步 Redis 客户端失败: {e}")
|
|
|
|
|
|
- logger.info("WebSocket manager shutdown complete")
|
|
|
+ logger.info("WebSocket 管理器已关闭")
|
|
|
|
|
|
manager = ConnectionManager()
|