*/ private static array $connections = []; private static bool $eventBusConsumerStarted = false; private static bool $adminSnapshotTickerStarted = false; /** * 从 Redis 队列拉取事件并推送给已订阅连接。 * 部分环境下 WebSocket 进程的 onWorkerStart 可能未触发,因此在首帧握手处也会兜底启动一次(全局仅注册一个 Timer)。 */ private static function ensureEventBusConsumer(): void { if (self::$eventBusConsumerStarted) { return; } self::$eventBusConsumerStarted = true; Timer::add(1, static function (): void { $events = GameWebSocketEventBus::popBatch(); if ($events === []) { return; } foreach ($events as $event) { $topic = $event['topic'] ?? ''; if (!is_string($topic) || $topic === '') { continue; } $eventName = $event['event'] ?? $topic; $data = $event['data'] ?? []; if (!is_array($data)) { $data = []; } $serverTime = $event['server_time'] ?? time(); foreach (self::$connections as $connection) { $topics = $connection->topics ?? []; if (!is_array($topics) || !in_array($topic, $topics, true)) { continue; } $connection->send(json_encode([ 'event' => $eventName, 'topic' => $topic, 'data' => $data, 'server_time' => $serverTime, ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); } } }); } /** * 兜底直推:admin.live.snapshot 每秒主动构建并广播。 * 目的:即使 Redis 队列不可用,也能保证 /admin/game/live 实时看到对局变化。 */ private static function ensureAdminLiveSnapshotTicker(): void { if (self::$adminSnapshotTickerStarted) { return; } self::$adminSnapshotTickerStarted = true; Timer::add(1, static function (): void { $hasAdminSubscriber = false; foreach (self::$connections as $connection) { $topics = $connection->topics ?? []; if (is_array($topics) && in_array('admin.live.snapshot', $topics, true)) { $hasAdminSubscriber = true; break; } } if (!$hasAdminSubscriber) { return; } // 与 GameLiveTicker 对齐:仅推快照时若 live ticker 未运行会导致倒计时归零但永不开奖 GameLiveService::finalizePayoutGrace(); GameLiveService::tickAutoDraw(); $snapshot = GameLiveService::buildSnapshot(null); $payload = json_encode([ 'event' => 'admin.live.snapshot', 'topic' => 'admin.live.snapshot', 'data' => $snapshot, 'server_time' => time(), ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); if (!is_string($payload) || $payload === '') { return; } foreach (self::$connections as $connection) { $topics = $connection->topics ?? []; if (!is_array($topics) || !in_array('admin.live.snapshot', $topics, true)) { continue; } $connection->send($payload); } }); } public function onWorkerStart(): void { self::ensureEventBusConsumer(); self::ensureAdminLiveSnapshotTicker(); } public function onConnect(TcpConnection $connection): void { $connection->topics = []; self::$connections[$connection->id] = $connection; } public function onWebSocketConnect(TcpConnection $connection): void { self::ensureEventBusConsumer(); self::ensureAdminLiveSnapshotTicker(); $connection->send(json_encode([ 'event' => 'ws.connected', 'message' => 'WebSocket connected', 'connection_id' => $connection->id, 'server_time' => time(), 'heartbeat_interval' => 30, ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); } public function onMessage(TcpConnection $connection, string $payload): void { $decoded = json_decode($payload, true); if (!is_array($decoded)) { $connection->send(json_encode([ 'event' => 'ws.error', 'message' => 'Invalid JSON payload', ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); return; } $action = $decoded['action'] ?? ''; if ($action === 'ping') { $connection->send(json_encode([ 'event' => 'pong', 'server_time' => date('Y-m-d H:i:s'), ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); return; } if ($action === 'subscribe') { $topics = $decoded['topics'] ?? []; $sanitized = []; if (is_array($topics)) { foreach ($topics as $topic) { if (!is_string($topic)) { continue; } $value = trim($topic); if ($value === '') { continue; } $sanitized[] = $value; } } $connection->topics = array_values(array_unique($sanitized)); $connection->send(json_encode([ 'event' => 'ws.subscribed', 'topics' => $connection->topics, ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); self::pushAdminTestOddsPreview($connection, $connection->topics); return; } $connection->send(json_encode([ 'event' => 'ws.error', 'message' => 'Unsupported action', 'received_action' => $action, ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); } public function onError(TcpConnection $connection, int $code, string $msg): void { $connection->send(json_encode([ 'event' => 'ws.error', 'message' => 'Server internal error', 'code' => $code, 'detail' => $msg, ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); } public function onClose(TcpConnection $connection): void { $connection->topics = []; unset(self::$connections[$connection->id]); } /** * 后台联调:订阅赔率相关主题后立即推送演示帧(库内样例玩家,带 is_test / preview)。 * * @param list $topics */ private static function pushAdminTestOddsPreview(TcpConnection $connection, array $topics): void { $frames = GameWebSocketPayloadHelper::adminTestPushFrames($topics); if ($frames === []) { return; } $serverTime = time(); foreach ($frames as $frame) { $connection->send(json_encode([ 'event' => $frame['event'], 'topic' => $frame['topic'], 'data' => $frame['data'], 'server_time' => $serverTime, ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); } } }