服务端:`{"action":"ping"}` / `{"action":"subscribe","topics":[...]}` * - 服务端 -> 客户端:`ws.connected` / `ws.subscribed` / `pong` / `ws.error` / 业务事件帧 * - 主动心跳超时:连接 60s 内无任何上行报文(含 ping)即被 server 主动 close, * 触发客户端重连,避免半关闭僵尸连接堵在分发表里 * - 事件分发由 GameWebSocketDispatcher 负责;每次 publish/consume/dispatch/send/订阅/关闭 * 都写入独立日志通道 runtime/logs/ws.log * * 高可用: * - 消费 Timer 的 popBatch 异常已被 EventBus 内部捕获并记录,下次 tick 自动恢复 * - 任一连接 send 异常不会影响其它连接的下发(Dispatcher 内单连接 try/catch) * - admin.live.snapshot 兜底直推 Timer 与队列消费解耦,Redis 异常时后台仍能看到对局状态 */ class GameWebSocketServer { /** @var array connection_id => TcpConnection(仅本进程内) */ private static array $connections = []; /** @var array> connection_id => 鉴权元数据(mode/admin_id 等,便于日志/排查) */ private static array $connectionAuth = []; private static bool $eventBusConsumerStarted = false; private static bool $adminSnapshotTickerStarted = false; private static bool $heartbeatCheckerStarted = false; /** @var bool */ private static bool $liveStateTickerStarted = false; /** 客户端 60s 内无任何上行报文(含 ping)即被主动断开 */ private const HEARTBEAT_IDLE_SECONDS = 60; /** 事件队列每 N ms 拉一次(默认 1s;与文档 §7.1.3 一致) */ private const QUEUE_TICK_INTERVAL = 1; /** 心跳超时检查间隔(秒) */ private const HEARTBEAT_CHECK_INTERVAL = 10; public function onWorkerStart(): void { Log::channel('ws')->info('ws worker start', [ 'pid' => function_exists('posix_getpid') ? posix_getpid() : getmypid(), ]); self::ensureEventBusConsumer(); self::ensureAdminLiveSnapshotTicker(); self::ensureHeartbeatChecker(); } public function onConnect(TcpConnection $connection): void { $connection->topics = []; $connection->wsAuth = null; self::$connections[$connection->id] = $connection; Log::channel('ws')->debug('tcp onConnect', [ 'connection_id' => $connection->id, 'remote' => method_exists($connection, 'getRemoteIp') ? $connection->getRemoteIp() : '', ]); } /** * Workerman 在 WebSocket 握手时回调;$request 可能是 string(HTTP raw)或 Webman\Http\Request 对象, * 这里仅尝试从 connection 内置变量取 QueryString 做最大兼容。 */ public function onWebSocketConnect(TcpConnection $connection, mixed $request = null): void { self::ensureEventBusConsumer(); self::ensureAdminLiveSnapshotTicker(); self::ensureHeartbeatChecker(); self::ensureLiveStateTicker(); $queryString = self::extractQueryString($connection, $request); $query = GameWebSocketAuthHelper::parseQueryString($queryString); $auth = GameWebSocketAuthHelper::authorize($query); $remoteIp = self::remoteIp($connection); if (!$auth['ok']) { Log::channel('ws')->warning('handshake denied', [ 'connection_id' => $connection->id, 'remote_ip' => $remoteIp, 'reason' => $auth['reason'], 'query_keys' => array_keys($query), ]); GameWebSocketDispatcher::sendDirect($connection, 'ws.error', [ 'code' => 1101, 'message' => 'Authentication failed: ' . ($auth['reason'] ?: 'unauthorized'), ], 'handshake_denied'); try { $connection->close(); } catch (Throwable) { } return; } $connection->wsAuth = $auth; self::$connectionAuth[$connection->id] = $auth; GameWebSocketSubscriptionRegistry::registerConnection( $connection->id, (int) $auth['user_id'], is_string($auth['mode'] ?? null) ? (string) $auth['mode'] : 'mobile', $remoteIp ); Log::channel('ws')->info('handshake ok', [ 'connection_id' => $connection->id, 'remote_ip' => $remoteIp, 'mode' => $auth['mode'], 'user_id' => $auth['user_id'], 'admin_id' => $auth['admin_id'], ]); GameWebSocketDispatcher::sendDirect($connection, 'ws.connected', [ 'message' => 'WebSocket connected', 'mode' => $auth['mode'], 'server_time' => time(), 'heartbeat_interval' => 30, 'idle_timeout' => self::HEARTBEAT_IDLE_SECONDS, ], 'handshake_ok'); } public function onMessage(TcpConnection $connection, string $payload): void { GameWebSocketSubscriptionRegistry::touch($connection->id); $decoded = json_decode($payload, true); if (!is_array($decoded)) { Log::channel('ws')->debug('invalid json payload', [ 'connection_id' => $connection->id, 'payload_size' => strlen($payload), ]); GameWebSocketDispatcher::sendDirect($connection, 'ws.error', [ 'message' => 'Invalid JSON payload', ], 'invalid_json'); return; } $action = isset($decoded['action']) && is_string($decoded['action']) ? trim($decoded['action']) : ''; if ($action === 'ping') { GameWebSocketDispatcher::sendDirect($connection, 'pong', [ 'server_time' => date('Y-m-d H:i:s'), 'server_time_ts' => time(), ], 'pong'); return; } if ($action === 'subscribe') { $rawTopics = $decoded['topics'] ?? []; $rawList = is_array($rawTopics) ? $rawTopics : []; $finalTopics = GameWebSocketSubscriptionRegistry::replaceSubscriptions($connection->id, $rawList); Log::channel('ws')->info('subscribe', [ 'connection_id' => $connection->id, 'user_id' => GameWebSocketSubscriptionRegistry::userIdOf($connection->id), 'topics' => $finalTopics, 'requested_count' => is_array($rawTopics) ? count($rawTopics) : 0, ]); GameWebSocketDispatcher::sendDirect($connection, 'ws.subscribed', [ 'topics' => $finalTopics, ], 'subscribed'); self::pushAdminTestOddsPreview($connection, $finalTopics); return; } Log::channel('ws')->debug('unsupported action', [ 'connection_id' => $connection->id, 'action' => $action, ]); GameWebSocketDispatcher::sendDirect($connection, 'ws.error', [ 'message' => 'Unsupported action', 'received_action' => $action, ], 'unsupported_action'); } public function onError(TcpConnection $connection, int $code, string $msg): void { Log::channel('ws')->warning('connection error', [ 'connection_id' => $connection->id, 'code' => $code, 'detail' => $msg, ]); try { $connection->send(json_encode([ 'event' => 'ws.error', 'message' => 'Server internal error', 'code' => $code, 'detail' => $msg, ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); } catch (Throwable) { } } public function onClose(TcpConnection $connection): void { $auth = self::$connectionAuth[$connection->id] ?? []; Log::channel('ws')->info('connection close', [ 'connection_id' => $connection->id, 'user_id' => GameWebSocketSubscriptionRegistry::userIdOf($connection->id), 'mode' => $auth['mode'] ?? '', 'remaining_connections' => max(0, GameWebSocketSubscriptionRegistry::connectionCount() - 1), ]); GameWebSocketSubscriptionRegistry::unregisterConnection($connection->id); unset(self::$connections[$connection->id], self::$connectionAuth[$connection->id]); } // ============================================================ // 内部:Timer / 兜底 // ============================================================ /** * 每秒拉取 Redis 队列事件并分发到对应订阅连接。 * popBatch 内部已捕获 Redis 异常并写 ws 日志,Timer 不会因此停跑。 */ private static function ensureEventBusConsumer(): void { if (self::$eventBusConsumerStarted) { return; } self::$eventBusConsumerStarted = true; Timer::add(self::QUEUE_TICK_INTERVAL, static function (): void { try { $events = GameWebSocketEventBus::popBatch(); if ($events === []) { return; } foreach ($events as $event) { try { GameWebSocketDispatcher::dispatch($event, self::$connections); } catch (Throwable $e) { Log::channel('ws')->error('dispatch loop exception', [ 'topic' => $event['topic'] ?? '', 'error' => $e->getMessage(), ]); } } } catch (Throwable $e) { Log::channel('ws')->error('event bus consumer tick exception', [ 'error' => $e->getMessage(), ]); } }); Log::channel('ws')->info('event bus consumer started', [ 'tick_interval_seconds' => self::QUEUE_TICK_INTERVAL, ]); } /** * admin.live.snapshot 兜底:每秒构建一次快照直接推送(不依赖 Redis 队列)。 * 用于 /admin/game/live 实时对局页,确保 Redis 异常时后台仍能看到对局状态。 */ private static function ensureAdminLiveSnapshotTicker(): void { if (self::$adminSnapshotTickerStarted) { return; } self::$adminSnapshotTickerStarted = true; Timer::add(1, static function (): void { $cids = GameWebSocketSubscriptionRegistry::connectionsForTopic('admin.live.snapshot'); if ($cids === []) { return; } try { $snapshot = GameLiveService::buildSnapshot(null); } catch (Throwable $e) { Log::channel('ws')->error('admin snapshot build failed', [ 'error' => $e->getMessage(), ]); return; } $payload = json_encode([ 'event' => 'admin.live.snapshot', 'topic' => 'admin.live.snapshot', 'data' => GameWebSocketPayloadHelper::sanitizeOutboundData($snapshot), 'server_time' => time(), ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); if (!is_string($payload) || $payload === '') { return; } $sent = 0; $failed = 0; foreach ($cids as $cid) { if (!isset(self::$connections[$cid])) { continue; } try { self::$connections[$cid]->send($payload); $sent++; } catch (Throwable $e) { $failed++; Log::channel('ws')->warning('admin snapshot send failed', [ 'connection_id' => $cid, 'error' => $e->getMessage(), ]); } } }); } /** * 心跳超时检查器:每 10s 扫描一次,关闭 60s 无上行报文的连接。 * 触发客户端重连,避免半关闭僵尸连接持续接收推送但不实际送达。 */ private static function ensureHeartbeatChecker(): void { if (self::$heartbeatCheckerStarted) { return; } self::$heartbeatCheckerStarted = true; Timer::add(self::HEARTBEAT_CHECK_INTERVAL, static function (): void { $cutoff = time() - self::HEARTBEAT_IDLE_SECONDS; $stale = GameWebSocketSubscriptionRegistry::staleConnections($cutoff); if ($stale === []) { return; } foreach ($stale as $cid) { Log::channel('ws')->info('close idle connection', [ 'connection_id' => $cid, 'user_id' => GameWebSocketSubscriptionRegistry::userIdOf($cid), 'idle_seconds' => self::HEARTBEAT_IDLE_SECONDS, ]); if (isset(self::$connections[$cid])) { try { self::$connections[$cid]->close(); } catch (Throwable) { } } } }); Log::channel('ws')->info('heartbeat checker started', [ 'idle_seconds' => self::HEARTBEAT_IDLE_SECONDS, 'check_interval' => self::HEARTBEAT_CHECK_INTERVAL, ]); } /** * 兜底:当 gameLiveTicker 未运行/卡死时,WS 进程也能驱动对局推进,避免对局长期卡在派彩中(status=3)。 * * - 每秒执行一次 recoverLiveRoundState()(内部:finalizePayoutGrace + tickAutoDraw) * - 幂等且有锁保护,和 gameLiveTicker 并存也不会破坏一致性 */ private static function ensureLiveStateTicker(): void { if (self::$liveStateTickerStarted) { return; } self::$liveStateTickerStarted = true; Timer::add(1, static function (): void { try { GameLiveService::recoverLiveRoundState(); } catch (Throwable $e) { Log::channel('ws')->error('live state ticker failed', [ 'error' => $e->getMessage(), ]); } }); Log::channel('ws')->info('live state ticker started', [ 'tick_interval_seconds' => 1, ]); } /** * 后台联调:订阅赔率相关主题后立即推送演示帧(库内样例玩家,带 is_test / preview)。 * * @param list $topics */ private static function pushAdminTestOddsPreview(TcpConnection $connection, array $topics): void { $frames = GameWebSocketPayloadHelper::adminTestPushFrames($topics); if ($frames === []) { return; } $serverTime = time(); foreach ($frames as $frame) { GameWebSocketDispatcher::sendDirect($connection, $frame['event'], [ 'topic' => $frame['topic'], 'data' => $frame['data'], 'server_time' => $serverTime, ], 'admin_test_preview'); } } /** * 兼容多种 Workerman 版本:尝试取握手 URI 的 query 部分。 * Workerman v5 中 onWebSocketConnect 第二参数已是 array $headers/$request;这里尽量兼容字符串与对象。 */ private static function extractQueryString(TcpConnection $connection, mixed $request): string { if (isset($connection->wsHandshakeQuery) && is_string($connection->wsHandshakeQuery)) { return (string) $connection->wsHandshakeQuery; } if (isset($connection->onWebSocketConnect)) { // noop } $serverUri = ''; if (isset($_SERVER['REQUEST_URI']) && is_string($_SERVER['REQUEST_URI'])) { $serverUri = $_SERVER['REQUEST_URI']; } if ($serverUri === '' && isset($connection->headers) && is_array($connection->headers)) { $line = $connection->headers['get'] ?? $connection->headers['GET'] ?? ''; if (is_string($line)) { $serverUri = $line; } } if ($serverUri === '' && is_string($request)) { if (preg_match('#^GET\s+([^\s]+)#i', $request, $m) === 1) { $serverUri = $m[1]; } } if ($serverUri === '' && is_object($request) && method_exists($request, 'queryString')) { $serverUri = '?' . (string) $request->queryString(); } $qPos = strpos($serverUri, '?'); if ($qPos === false) { return ''; } return substr($serverUri, $qPos + 1); } private static function remoteIp(TcpConnection $connection): string { try { if (method_exists($connection, 'getRemoteIp')) { return (string) $connection->getRemoteIp(); } } catch (Throwable) { } return ''; } }