*/ private static array $connections = []; private static bool $eventBusConsumerStarted = false; /** * 从 Redis 队列拉取事件并推送给已订阅连接。 * 部分环境下 WebSocket 进程的 onWorkerStart 可能未触发,因此在首帧握手处也会兜底启动一次(全局仅注册一个 Timer)。 */ private static function ensureEventBusConsumer(): void { if (self::$eventBusConsumerStarted) { return; } self::$eventBusConsumerStarted = true; Timer::add(1, static function (): void { $events = GameWebSocketEventBus::popBatch(); if ($events === []) { return; } foreach ($events as $event) { $topic = $event['topic'] ?? ''; if (!is_string($topic) || $topic === '') { continue; } $eventName = $event['event'] ?? $topic; $data = $event['data'] ?? []; if (!is_array($data)) { $data = []; } $serverTime = $event['server_time'] ?? time(); foreach (self::$connections as $connection) { $topics = $connection->topics ?? []; if (!is_array($topics) || !in_array($topic, $topics, true)) { continue; } $connection->send(json_encode([ 'event' => $eventName, 'topic' => $topic, 'data' => $data, 'server_time' => $serverTime, ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); } } }); } public function onWorkerStart(): void { self::ensureEventBusConsumer(); } public function onConnect(TcpConnection $connection): void { $connection->topics = []; self::$connections[$connection->id] = $connection; } public function onWebSocketConnect(TcpConnection $connection): void { self::ensureEventBusConsumer(); $connection->send(json_encode([ 'event' => 'ws.connected', 'message' => 'WebSocket connected', 'connection_id' => $connection->id, 'server_time' => time(), 'heartbeat_interval' => 30, ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); } public function onMessage(TcpConnection $connection, string $payload): void { $decoded = json_decode($payload, true); if (!is_array($decoded)) { $connection->send(json_encode([ 'event' => 'ws.error', 'message' => 'Invalid JSON payload', ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); return; } $action = $decoded['action'] ?? ''; if ($action === 'ping') { $connection->send(json_encode([ 'event' => 'pong', 'server_time' => date('Y-m-d H:i:s'), ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); return; } if ($action === 'subscribe') { $topics = $decoded['topics'] ?? []; $sanitized = []; if (is_array($topics)) { foreach ($topics as $topic) { if (!is_string($topic)) { continue; } $value = trim($topic); if ($value === '') { continue; } $sanitized[] = $value; } } $connection->topics = array_values(array_unique($sanitized)); $connection->send(json_encode([ 'event' => 'ws.subscribed', 'topics' => $connection->topics, ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); return; } $connection->send(json_encode([ 'event' => 'ws.error', 'message' => 'Unsupported action', 'received_action' => $action, ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); } public function onError(TcpConnection $connection, int $code, string $msg): void { $connection->send(json_encode([ 'event' => 'ws.error', 'message' => 'Server internal error', 'code' => $code, 'detail' => $msg, ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); } public function onClose(TcpConnection $connection): void { $connection->topics = []; unset(self::$connections[$connection->id]); } }