From 8f5ba977a46251b1f24f81f66c77862f7164e32e Mon Sep 17 00:00:00 2001 From: zhenhui <1276357500@qq.com> Date: Wed, 27 May 2026 10:28:39 +0800 Subject: [PATCH] =?UTF-8?q?1.=E9=87=8D=E6=9E=84websocket=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/admin/controller/game/Live.php | 13 +- .../controller/test/GameCurrentStatus.php | 13 +- .../library/admin/WebSocketConfigHelper.php | 27 + .../service/GameWebSocketAuthHelper.php | 224 +++++++++ .../service/GameWebSocketDispatcher.php | 154 ++++++ app/common/service/GameWebSocketEventBus.php | 47 +- .../GameWebSocketSubscriptionRegistry.php | 186 +++++++ app/process/GameWebSocketServer.php | 464 +++++++++++++----- config/log.php | 22 + docs/36字花-移动端接口设计草案.md | 25 +- scripts/smoke_ws_auth_helper.php | 59 +++ web/src/views/backend/game/live/index.vue | 11 +- 12 files changed, 1101 insertions(+), 144 deletions(-) create mode 100644 app/common/service/GameWebSocketAuthHelper.php create mode 100644 app/common/service/GameWebSocketDispatcher.php create mode 100644 app/common/service/GameWebSocketSubscriptionRegistry.php create mode 100644 scripts/smoke_ws_auth_helper.php diff --git a/app/admin/controller/game/Live.php b/app/admin/controller/game/Live.php index 8184224..9f0b76d 100644 --- a/app/admin/controller/game/Live.php +++ b/app/admin/controller/game/Live.php @@ -6,6 +6,7 @@ use app\common\controller\Backend; use app\common\library\admin\WebSocketConfigHelper; use app\common\service\GameLiveService; use app\common\service\GameRecordService; +use app\common\service\GameWebSocketAuthHelper; use support\Response; use Webman\Http\Request as WebmanRequest; @@ -67,9 +68,19 @@ class Live extends Backend 'auto.spin.progress', ]; + $adminId = $this->auth ? (int) ($this->auth->id ?? 0) : 0; + $adminWs = GameWebSocketAuthHelper::issueAdminWsToken($adminId); + $baseWsUrl = WebSocketConfigHelper::wsUrl($request); + $wsUrl = WebSocketConfigHelper::appendTokensToWsUrl($baseWsUrl, [ + 'admin_ws_token' => (string) ($adminWs['token'] ?? ''), + ]); + return $this->success('', [ 'name' => 'ws.admin.live', - 'ws_url' => WebSocketConfigHelper::wsUrl($request), + 'ws_url' => $wsUrl, + 'ws_base_url' => $baseWsUrl, + 'admin_ws_token' => (string) ($adminWs['token'] ?? ''), + 'admin_ws_token_ttl' => (int) ($adminWs['ttl'] ?? 0), 'connect_tip' => 'The admin live page auto-subscribes topics for status, draw result and payout events.', 'subscribe_topics' => $topics, 'sample_messages' => [ diff --git a/app/admin/controller/test/GameCurrentStatus.php b/app/admin/controller/test/GameCurrentStatus.php index 7cb7afb..3b3dde2 100644 --- a/app/admin/controller/test/GameCurrentStatus.php +++ b/app/admin/controller/test/GameCurrentStatus.php @@ -6,6 +6,7 @@ namespace app\admin\controller\test; use app\common\controller\Backend; use app\common\library\admin\WebSocketConfigHelper; +use app\common\service\GameWebSocketAuthHelper; use app\common\service\GameWebSocketPayloadHelper; use support\Response; use Webman\Http\Request as WebmanRequest; @@ -43,9 +44,19 @@ class GameCurrentStatus extends Backend $oddsPushTopics = GameWebSocketPayloadHelper::ODDS_PUSH_TOPICS; $testPlayerOdds = GameWebSocketPayloadHelper::adminTestPlayerOddsSnapshot(); + $adminId = $this->auth ? (int) ($this->auth->id ?? 0) : 0; + $adminWs = GameWebSocketAuthHelper::issueAdminWsToken($adminId); + $baseWsUrl = WebSocketConfigHelper::wsUrl($request); + $wsUrl = WebSocketConfigHelper::appendTokensToWsUrl($baseWsUrl, [ + 'admin_ws_token' => (string) ($adminWs['token'] ?? ''), + ]); + return $this->success('', [ 'name' => 'ws.period', - 'ws_url' => WebSocketConfigHelper::wsUrl($request), + 'ws_url' => $wsUrl, + 'ws_base_url' => $baseWsUrl, + 'admin_ws_token' => (string) ($adminWs['token'] ?? ''), + 'admin_ws_token_ttl' => (int) ($adminWs['ttl'] ?? 0), 'connect_tip' => '连接成功后将自动订阅下列主题。真实业务仅在有玩家下注/结算时推送赔率;本页联调会在订阅后额外推送带 is_test/preview 的演示帧(见下方测试玩家赔率)。', 'subscribe_topics' => $subscribeTopics, 'odds_push_topics' => $oddsPushTopics, diff --git a/app/common/library/admin/WebSocketConfigHelper.php b/app/common/library/admin/WebSocketConfigHelper.php index c55fbd6..e01cbb3 100644 --- a/app/common/library/admin/WebSocketConfigHelper.php +++ b/app/common/library/admin/WebSocketConfigHelper.php @@ -40,6 +40,33 @@ final class WebSocketConfigHelper return 'ws://127.0.0.1:3131/ws/'; } + /** + * 在基础 ws_url 上拼接握手鉴权 Query: + * - 后台用:auth_token + admin_ws_token(可观测全量主题,无 user_id 过滤) + * - H5 用:调用方传 user_token;与 auth_token 一起拼上去 + * + * @param array{auth_token?: string, user_token?: string, admin_ws_token?: string} $tokens + */ + public static function appendTokensToWsUrl(string $wsUrl, array $tokens): string + { + $wsUrl = trim($wsUrl); + if ($wsUrl === '') { + return $wsUrl; + } + $pairs = []; + foreach (['auth_token', 'user_token', 'admin_ws_token'] as $key) { + $val = isset($tokens[$key]) && is_string($tokens[$key]) ? trim($tokens[$key]) : ''; + if ($val !== '') { + $pairs[] = $key . '=' . rawurlencode($val); + } + } + if ($pairs === []) { + return $wsUrl; + } + $sep = str_contains($wsUrl, '?') ? '&' : '?'; + return $wsUrl . $sep . implode('&', $pairs); + } + private static function isLoopbackWsUrl(string $url): bool { $host = parse_url($url, PHP_URL_HOST); diff --git a/app/common/service/GameWebSocketAuthHelper.php b/app/common/service/GameWebSocketAuthHelper.php new file mode 100644 index 0000000..596718c --- /dev/null +++ b/app/common/service/GameWebSocketAuthHelper.php @@ -0,0 +1,224 @@ + bool, + * 'user_id' => int, + * 'mode' => 'mobile' | 'admin' | '', + * 'admin_id'=> int, + * 'reason' => string, + * 'auth_token' => string, + * 'user_token' => string, + * 'admin_ws_token' => string, + * ] + */ +final class GameWebSocketAuthHelper +{ + /** admin_ws_token 在 Redis 中的 key 前缀;value 存 admin_id,TTL 由 issueAdminWsToken 决定 */ + private const ADMIN_TOKEN_REDIS_PREFIX = 'dfw:v1:ws:admin_token:'; + private const ADMIN_TOKEN_DEFAULT_TTL = 7200; + + /** + * @param array $query 解析后的 URL Query 参数 + * @return array{ok:bool, user_id:int, mode:string, admin_id:int, reason:string, auth_token:string, user_token:string, admin_ws_token:string} + */ + public static function authorize(array $query): array + { + $authToken = self::pickFirstString($query, ['auth_token', 'auth-token', 'authToken']); + $userToken = self::pickFirstString($query, ['user_token', 'user-token', 'userToken', 'token']); + $adminWsToken = self::pickFirstString($query, ['admin_ws_token', 'admin-ws-token', 'adminWsToken']); + + // ===== Admin 旁路:只校验 admin_ws_token(由后台 wsConfig 签发,已隐含管理员身份) ===== + if ($adminWsToken !== '') { + $adminId = self::validateAdminWsToken($adminWsToken); + if ($adminId > 0) { + return [ + 'ok' => true, + 'user_id' => 0, + 'mode' => 'admin', + 'admin_id' => $adminId, + 'reason' => '', + 'auth_token' => $authToken, + 'user_token' => $userToken, + 'admin_ws_token' => $adminWsToken, + ]; + } + return self::deny('admin-ws-token invalid or expired', $authToken, $userToken, $adminWsToken); + } + + // ===== Mobile(H5):必须同时校验 auth-token + user-token ===== + if ($authToken === '') { + return self::deny('missing auth-token', '', $userToken, ''); + } + $authData = Token::get($authToken); + if (!is_array($authData) || ($authData['type'] ?? '') !== 'auth-token') { + return self::deny('invalid auth-token type', $authToken, $userToken, ''); + } + $authExpire = filter_var($authData['expire_time'] ?? 0, FILTER_VALIDATE_INT); + if ($authExpire === false || $authExpire < time()) { + return self::deny('auth-token expired', $authToken, $userToken, ''); + } + + if ($userToken === '') { + return self::deny('missing user-token', $authToken, '', ''); + } + $userData = Token::get($userToken); + if (!is_array($userData) || ($userData['type'] ?? '') !== Auth::TOKEN_TYPE) { + return self::deny('invalid user-token type', $authToken, $userToken, ''); + } + $userExpire = filter_var($userData['expire_time'] ?? 0, FILTER_VALIDATE_INT); + if ($userExpire === false || $userExpire < time()) { + return self::deny('user-token expired', $authToken, $userToken, ''); + } + $userId = filter_var($userData['user_id'] ?? 0, FILTER_VALIDATE_INT); + if ($userId === false || $userId <= 0) { + return self::deny('user-token has no user_id', $authToken, $userToken, ''); + } + + return [ + 'ok' => true, + 'user_id' => (int) $userId, + 'mode' => 'mobile', + 'admin_id' => 0, + 'reason' => '', + 'auth_token' => $authToken, + 'user_token' => $userToken, + 'admin_ws_token' => '', + ]; + } + + /** + * 为已登录的后台管理员签发短时 admin-ws-token;返回 [token, ttl]。 + * 调用方:app/admin/controller/test/GameCurrentStatus::wsConfig、app/admin/controller/game/Live::wsConfig + */ + public static function issueAdminWsToken(int $adminId, ?int $ttl = null): array + { + if ($adminId <= 0) { + return ['token' => '', 'ttl' => 0]; + } + $ttl = ($ttl !== null && $ttl > 0) ? $ttl : self::ADMIN_TOKEN_DEFAULT_TTL; + try { + $token = bin2hex(random_bytes(20)); + } catch (Throwable) { + $token = md5(uniqid('admin_ws_', true) . microtime(true) . random_int(0, PHP_INT_MAX)); + } + try { + Redis::setEx(self::ADMIN_TOKEN_REDIS_PREFIX . $token, $ttl, (string) $adminId); + } catch (Throwable) { + return ['token' => '', 'ttl' => 0]; + } + return ['token' => $token, 'ttl' => $ttl]; + } + + /** + * 校验 admin-ws-token;返回 admin_id(>0 表示有效),0 表示无效/过期。 + */ + public static function validateAdminWsToken(string $token): int + { + $token = trim($token); + if ($token === '' || strlen($token) > 96) { + return 0; + } + try { + $raw = Redis::get(self::ADMIN_TOKEN_REDIS_PREFIX . $token); + } catch (Throwable) { + return 0; + } + if ($raw === false || $raw === null || $raw === '') { + return 0; + } + $adminId = filter_var($raw, FILTER_VALIDATE_INT); + return $adminId === false ? 0 : (int) $adminId; + } + + /** + * 从 ws header 中解析 GET 行 Query(Workerman 在 onWebSocketConnect($connection, $request) 时 + * $request 可能为字符串或对象;为兼容,这里允许直接传 URI Query 字符串)。 + * + * @return array + */ + public static function parseQueryString(string $queryString): array + { + $queryString = trim($queryString); + if ($queryString === '') { + return []; + } + if ($queryString[0] === '?') { + $queryString = substr($queryString, 1); + } + $out = []; + parse_str($queryString, $out); + $clean = []; + foreach ($out as $k => $v) { + if (!is_string($k)) { + continue; + } + if (is_string($v)) { + $clean[$k] = $v; + } elseif (is_scalar($v)) { + $clean[$k] = (string) $v; + } + } + return $clean; + } + + /** + * @param array $query + * @param list $keys + */ + private static function pickFirstString(array $query, array $keys): string + { + foreach ($keys as $k) { + if (!isset($query[$k])) { + continue; + } + $v = $query[$k]; + if (!is_scalar($v)) { + continue; + } + $s = trim((string) $v); + if ($s !== '') { + return $s; + } + } + return ''; + } + + /** + * @return array{ok:bool, user_id:int, mode:string, admin_id:int, reason:string, auth_token:string, user_token:string, admin_ws_token:string} + */ + private static function deny(string $reason, string $authToken, string $userToken, string $adminWsToken): array + { + return [ + 'ok' => false, + 'user_id' => 0, + 'mode' => '', + 'admin_id' => 0, + 'reason' => $reason, + 'auth_token' => $authToken, + 'user_token' => $userToken, + 'admin_ws_token' => $adminWsToken, + ]; + } +} diff --git a/app/common/service/GameWebSocketDispatcher.php b/app/common/service/GameWebSocketDispatcher.php new file mode 100644 index 0000000..4ed30fc --- /dev/null +++ b/app/common/service/GameWebSocketDispatcher.php @@ -0,0 +1,154 @@ + + */ + private const USER_SCOPED_TOPICS = [ + 'bet.win', + 'user.streak', + 'wallet.changed', + 'bet.accepted', + 'auto.spin.progress', + ]; + + /** + * 分发单条事件到所有命中的连接。 + * + * @param array{topic:string, event:string, data:array, server_time:int} $event + * @param array $connections connection_id => TcpConnection + */ + public static function dispatch(array $event, array $connections): void + { + $topic = $event['topic'] ?? ''; + if (!is_string($topic) || $topic === '') { + return; + } + + $candidateIds = GameWebSocketSubscriptionRegistry::connectionsForTopic($topic); + if ($candidateIds === []) { + Log::channel('ws')->debug('dispatch skip: no subscriber', [ + 'topic' => $topic, + 'queue_server_time' => $event['server_time'] ?? 0, + ]); + return; + } + + $userScoped = in_array($topic, self::USER_SCOPED_TOPICS, true); + $payloadUserId = 0; + if ($userScoped) { + $raw = $event['data']['user_id'] ?? 0; + $parsed = filter_var($raw, FILTER_VALIDATE_INT); + $payloadUserId = $parsed === false ? 0 : (int) $parsed; + } + + $frame = json_encode([ + 'event' => $event['event'] ?? $topic, + 'topic' => $topic, + 'data' => $event['data'] ?? [], + 'server_time' => $event['server_time'] ?? time(), + ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); + if (!is_string($frame) || $frame === '') { + Log::channel('ws')->warning('dispatch skip: invalid json frame', [ + 'topic' => $topic, + ]); + return; + } + + $matched = 0; + $skippedNotOwner = 0; + $skippedClosed = 0; + $sendFailed = 0; + + foreach ($candidateIds as $cid) { + if (!isset($connections[$cid])) { + $skippedClosed++; + continue; + } + if ($userScoped && $payloadUserId > 0) { + $boundUid = GameWebSocketSubscriptionRegistry::userIdOf($cid); + if ($boundUid !== $payloadUserId) { + $skippedNotOwner++; + continue; + } + } + + try { + $connections[$cid]->send($frame); + $matched++; + } catch (Throwable $e) { + $sendFailed++; + Log::channel('ws')->warning('dispatch send failed', [ + 'topic' => $topic, + 'connection_id' => $cid, + 'error' => $e->getMessage(), + ]); + } + } + + Log::channel('ws')->info('dispatch', [ + 'topic' => $topic, + 'user_scoped' => $userScoped, + 'payload_user_id' => $payloadUserId, + 'candidates' => count($candidateIds), + 'matched' => $matched, + 'skipped_not_owner' => $skippedNotOwner, + 'skipped_closed' => $skippedClosed, + 'send_failed' => $sendFailed, + 'frame_size' => strlen($frame), + ]); + } + + /** + * 直接向某连接下发单帧(握手回执 / 订阅回执 / pong / 演示帧)。 + */ + public static function sendDirect(TcpConnection $connection, string $event, array $data, string $tag = ''): void + { + $frame = json_encode(array_merge([ + 'event' => $event, + ], $data), JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); + if (!is_string($frame) || $frame === '') { + return; + } + try { + $connection->send($frame); + Log::channel('ws')->debug('direct send', [ + 'connection_id' => $connection->id, + 'event' => $event, + 'tag' => $tag, + 'frame_size' => strlen($frame), + ]); + } catch (Throwable $e) { + Log::channel('ws')->warning('direct send failed', [ + 'connection_id' => $connection->id, + 'event' => $event, + 'tag' => $tag, + 'error' => $e->getMessage(), + ]); + } + } +} diff --git a/app/common/service/GameWebSocketEventBus.php b/app/common/service/GameWebSocketEventBus.php index 1a801b5..d4adeaa 100644 --- a/app/common/service/GameWebSocketEventBus.php +++ b/app/common/service/GameWebSocketEventBus.php @@ -10,6 +10,8 @@ use Throwable; /** * 通过 Redis 列表在不同进程间投递 WebSocket 事件。 + * + * 入队失败统一返回 false 并写 ws 日志(runtime/logs/ws.log),便于排查"为什么没有推送"。 */ final class GameWebSocketEventBus { @@ -18,7 +20,7 @@ final class GameWebSocketEventBus /** * @param array $data - * @return bool 是否成功入队(false 表示 Redis 不可用或参数非法,调用方应避免标记“已推送”) + * @return bool 是否成功入队(false 表示 Redis 不可用或参数非法,调用方应避免标记"已推送") */ public static function publish(string $topic, array $data): bool { @@ -34,12 +36,34 @@ final class GameWebSocketEventBus ]; $json = json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); if (!is_string($json) || $json === '') { + Log::channel('ws')->warning('publish skip: invalid json payload', [ + 'topic' => $topic, + ]); return false; } try { $len = Redis::lPush(self::KEY_QUEUE, $json); - return is_numeric($len) && (int) $len > 0; + $ok = is_numeric($len) && (int) $len > 0; + if ($ok) { + $uid = filter_var($data['user_id'] ?? 0, FILTER_VALIDATE_INT); + Log::channel('ws')->info('publish', [ + 'topic' => $topic, + 'user_id' => $uid === false ? 0 : (int) $uid, + 'queue_len_after' => (int) $len, + 'payload_size' => strlen($json), + ]); + } else { + Log::channel('ws')->warning('publish lpush returned non-positive', [ + 'topic' => $topic, + 'returned' => $len, + ]); + } + return $ok; } catch (Throwable $e) { + Log::channel('ws')->error('publish failed (redis exception)', [ + 'topic' => $topic, + 'error' => $e->getMessage(), + ]); Log::warning('ws event bus publish failed', [ 'topic' => $topic, 'error' => $e->getMessage(), @@ -92,10 +116,27 @@ final class GameWebSocketEventBus 'server_time' => $serverTime, ]; } - } catch (Throwable) { + } catch (Throwable $e) { + Log::channel('ws')->error('popBatch failed (redis exception)', [ + 'error' => $e->getMessage(), + 'popped' => count($out), + ]); return $out; } return $out; } + + /** + * 当前队列堆积长度(监控用)。 + */ + public static function queueLength(): int + { + try { + $len = Redis::lLen(self::KEY_QUEUE); + return is_numeric($len) ? (int) $len : 0; + } catch (Throwable) { + return -1; + } + } } diff --git a/app/common/service/GameWebSocketSubscriptionRegistry.php b/app/common/service/GameWebSocketSubscriptionRegistry.php new file mode 100644 index 0000000..fadc0b3 --- /dev/null +++ b/app/common/service/GameWebSocketSubscriptionRegistry.php @@ -0,0 +1,186 @@ + map + * - connection => [ topics: list, user_id: int, last_seen_at: int, remote_ip: string ] + * - 分发器按 (topic) 直接拿到候选连接列表,按 (data.user_id) 过滤后再 send,避免 O(N) 全遍历。 + * - **必须** 与 GameWebSocketServer 同进程使用:count=1,不可水平扩展(多 worker 间无法共享连接)。 + * + * 该类不持有 TcpConnection 引用,仅持有 connection_id 与元数据;Server 维护 connection_id => TcpConnection 映射。 + */ +final class GameWebSocketSubscriptionRegistry +{ + /** @var array> topic => { connection_id: true } */ + private static array $topicIndex = []; + + /** @var array, user_id: int, last_seen_at: int, remote_ip: string}> */ + private static array $connectionMeta = []; + + /** + * 注册新连接(onConnect 调用)。 + */ + public static function registerConnection(int $connectionId, int $userId, string $remoteIp = ''): void + { + if ($connectionId <= 0) { + return; + } + self::$connectionMeta[$connectionId] = [ + 'topics' => [], + 'user_id' => max(0, $userId), + 'last_seen_at' => time(), + 'remote_ip' => $remoteIp, + ]; + } + + /** + * 注销连接(onClose 调用):从所有 topic 索引中移除该 connection。 + */ + public static function unregisterConnection(int $connectionId): void + { + if ($connectionId <= 0 || !isset(self::$connectionMeta[$connectionId])) { + return; + } + foreach (self::$connectionMeta[$connectionId]['topics'] as $topic) { + if (isset(self::$topicIndex[$topic][$connectionId])) { + unset(self::$topicIndex[$topic][$connectionId]); + if (self::$topicIndex[$topic] === []) { + unset(self::$topicIndex[$topic]); + } + } + } + unset(self::$connectionMeta[$connectionId]); + } + + /** + * 替换该连接的订阅列表(subscribe 报文调用,覆盖式订阅,符合现网协议)。 + * + * @param list $topics + * @return list 实际生效的、去重排序后的订阅列表 + */ + public static function replaceSubscriptions(int $connectionId, array $topics): array + { + if ($connectionId <= 0 || !isset(self::$connectionMeta[$connectionId])) { + return []; + } + + foreach (self::$connectionMeta[$connectionId]['topics'] as $oldTopic) { + if (isset(self::$topicIndex[$oldTopic][$connectionId])) { + unset(self::$topicIndex[$oldTopic][$connectionId]); + if (self::$topicIndex[$oldTopic] === []) { + unset(self::$topicIndex[$oldTopic]); + } + } + } + + $clean = []; + foreach ($topics as $t) { + if (!is_string($t)) { + continue; + } + $v = trim($t); + if ($v === '' || strlen($v) > 64) { + continue; + } + $clean[$v] = true; + } + $finalTopics = array_keys($clean); + sort($finalTopics); + + self::$connectionMeta[$connectionId]['topics'] = $finalTopics; + foreach ($finalTopics as $topic) { + self::$topicIndex[$topic][$connectionId] = true; + } + + return $finalTopics; + } + + /** + * 获取订阅了指定 topic 的所有 connection_id。 + * + * @return list + */ + public static function connectionsForTopic(string $topic): array + { + $topic = trim($topic); + if ($topic === '' || !isset(self::$topicIndex[$topic])) { + return []; + } + + return array_keys(self::$topicIndex[$topic]); + } + + /** + * 标记连接活跃时间(接收任意消息时调用,用于心跳超时判断)。 + */ + public static function touch(int $connectionId): void + { + if (isset(self::$connectionMeta[$connectionId])) { + self::$connectionMeta[$connectionId]['last_seen_at'] = time(); + } + } + + /** + * @return array{topics: list, user_id: int, last_seen_at: int, remote_ip: string}|null + */ + public static function meta(int $connectionId): ?array + { + return self::$connectionMeta[$connectionId] ?? null; + } + + public static function userIdOf(int $connectionId): int + { + return self::$connectionMeta[$connectionId]['user_id'] ?? 0; + } + + /** + * 找出所有 last_seen_at 早于 $cutoff 的连接 id(用于服务端主动关闭僵尸连接)。 + * + * @return list + */ + public static function staleConnections(int $cutoff): array + { + $out = []; + foreach (self::$connectionMeta as $cid => $meta) { + if (($meta['last_seen_at'] ?? 0) < $cutoff) { + $out[] = $cid; + } + } + return $out; + } + + /** + * 当前活跃连接数(运维/诊断用)。 + */ + public static function connectionCount(): int + { + return count(self::$connectionMeta); + } + + /** + * 当前活跃订阅总数(运维/诊断用)。 + */ + public static function subscriptionCount(): int + { + $sum = 0; + foreach (self::$topicIndex as $conns) { + $sum += count($conns); + } + return $sum; + } + + /** + * 仅供测试/进程重启时清空索引。 + */ + public static function reset(): void + { + self::$topicIndex = []; + self::$connectionMeta = []; + } +} diff --git a/app/process/GameWebSocketServer.php b/app/process/GameWebSocketServer.php index d73be55..69770bb 100644 --- a/app/process/GameWebSocketServer.php +++ b/app/process/GameWebSocketServer.php @@ -4,26 +4,228 @@ declare(strict_types=1); namespace app\process; -use app\common\service\GameWebSocketEventBus; use app\common\service\GameLiveService; +use app\common\service\GameWebSocketAuthHelper; +use app\common\service\GameWebSocketDispatcher; +use app\common\service\GameWebSocketEventBus; use app\common\service\GameWebSocketPayloadHelper; +use app\common\service\GameWebSocketSubscriptionRegistry; +use support\Log; +use Throwable; use Workerman\Connection\TcpConnection; use Workerman\Timer; /** - * 后台测试页 WebSocket 服务(仅用于连接联调) + * H5/后台 WebSocket 服务(重构版,2026-05) + * + * 设计与 docs/36字花-移动端接口设计草案.md §7 对齐: + * + * - 握手鉴权(GameWebSocketAuthHelper) + * - mobile:URL Query `auth_token` + `user_token`,绑定 user_id,user 级主题按 user_id 过滤 + * - admin: URL Query `admin_ws_token`(后台 wsConfig 签发,写 Redis 短时签名),user_id=0, + * user 级主题不过滤(运维/联调可观测全量) + * - 客户端 -> 服务端:`{"action":"ping"}` / `{"action":"subscribe","topics":[...]}` + * - 服务端 -> 客户端:`ws.connected` / `ws.subscribed` / `pong` / `ws.error` / 业务事件帧 + * - 主动心跳超时:连接 60s 内无任何上行报文(含 ping)即被 server 主动 close, + * 触发客户端重连,避免半关闭僵尸连接堵在分发表里 + * - 事件分发由 GameWebSocketDispatcher 负责;每次 publish/consume/dispatch/send/订阅/关闭 + * 都写入独立日志通道 runtime/logs/ws.log + * + * 高可用: + * - 消费 Timer 的 popBatch 异常已被 EventBus 内部捕获并记录,下次 tick 自动恢复 + * - 任一连接 send 异常不会影响其它连接的下发(Dispatcher 内单连接 try/catch) + * - admin.live.snapshot 兜底直推 Timer 与队列消费解耦,Redis 异常时后台仍能看到对局状态 */ class GameWebSocketServer { - /** @var array */ + /** @var array connection_id => TcpConnection(仅本进程内) */ private static array $connections = []; + /** @var array> connection_id => 鉴权元数据(mode/admin_id 等,便于日志/排查) */ + private static array $connectionAuth = []; + private static bool $eventBusConsumerStarted = false; private static bool $adminSnapshotTickerStarted = false; + private static bool $heartbeatCheckerStarted = false; + + /** 客户端 60s 内无任何上行报文(含 ping)即被主动断开 */ + private const HEARTBEAT_IDLE_SECONDS = 60; + + /** 事件队列每 N ms 拉一次(默认 1s;与文档 §7.1.3 一致) */ + private const QUEUE_TICK_INTERVAL = 1; + + /** 心跳超时检查间隔(秒) */ + private const HEARTBEAT_CHECK_INTERVAL = 10; + + public function onWorkerStart(): void + { + Log::channel('ws')->info('ws worker start', [ + 'pid' => function_exists('posix_getpid') ? posix_getpid() : getmypid(), + ]); + self::ensureEventBusConsumer(); + self::ensureAdminLiveSnapshotTicker(); + self::ensureHeartbeatChecker(); + } + + public function onConnect(TcpConnection $connection): void + { + $connection->topics = []; + $connection->wsAuth = null; + self::$connections[$connection->id] = $connection; + Log::channel('ws')->debug('tcp onConnect', [ + 'connection_id' => $connection->id, + 'remote' => method_exists($connection, 'getRemoteIp') ? $connection->getRemoteIp() : '', + ]); + } /** - * 从 Redis 队列拉取事件并推送给已订阅连接。 - * 部分环境下 WebSocket 进程的 onWorkerStart 可能未触发,因此在首帧握手处也会兜底启动一次(全局仅注册一个 Timer)。 + * Workerman 在 WebSocket 握手时回调;$request 可能是 string(HTTP raw)或 Webman\Http\Request 对象, + * 这里仅尝试从 connection 内置变量取 QueryString 做最大兼容。 + */ + public function onWebSocketConnect(TcpConnection $connection, mixed $request = null): void + { + self::ensureEventBusConsumer(); + self::ensureAdminLiveSnapshotTicker(); + self::ensureHeartbeatChecker(); + + $queryString = self::extractQueryString($connection, $request); + $query = GameWebSocketAuthHelper::parseQueryString($queryString); + + $auth = GameWebSocketAuthHelper::authorize($query); + $remoteIp = self::remoteIp($connection); + + if (!$auth['ok']) { + Log::channel('ws')->warning('handshake denied', [ + 'connection_id' => $connection->id, + 'remote_ip' => $remoteIp, + 'reason' => $auth['reason'], + 'query_keys' => array_keys($query), + ]); + GameWebSocketDispatcher::sendDirect($connection, 'ws.error', [ + 'code' => 1101, + 'message' => 'Authentication failed: ' . ($auth['reason'] ?: 'unauthorized'), + ], 'handshake_denied'); + try { + $connection->close(); + } catch (Throwable) { + } + return; + } + + $connection->wsAuth = $auth; + self::$connectionAuth[$connection->id] = $auth; + GameWebSocketSubscriptionRegistry::registerConnection($connection->id, (int) $auth['user_id'], $remoteIp); + + Log::channel('ws')->info('handshake ok', [ + 'connection_id' => $connection->id, + 'remote_ip' => $remoteIp, + 'mode' => $auth['mode'], + 'user_id' => $auth['user_id'], + 'admin_id' => $auth['admin_id'], + ]); + + GameWebSocketDispatcher::sendDirect($connection, 'ws.connected', [ + 'message' => 'WebSocket connected', + 'connection_id' => $connection->id, + 'mode' => $auth['mode'], + 'user_id' => $auth['user_id'], + 'server_time' => time(), + 'heartbeat_interval' => 30, + 'idle_timeout' => self::HEARTBEAT_IDLE_SECONDS, + ], 'handshake_ok'); + } + + public function onMessage(TcpConnection $connection, string $payload): void + { + GameWebSocketSubscriptionRegistry::touch($connection->id); + + $decoded = json_decode($payload, true); + if (!is_array($decoded)) { + Log::channel('ws')->debug('invalid json payload', [ + 'connection_id' => $connection->id, + 'payload_size' => strlen($payload), + ]); + GameWebSocketDispatcher::sendDirect($connection, 'ws.error', [ + 'message' => 'Invalid JSON payload', + ], 'invalid_json'); + return; + } + + $action = isset($decoded['action']) && is_string($decoded['action']) ? trim($decoded['action']) : ''; + + if ($action === 'ping') { + GameWebSocketDispatcher::sendDirect($connection, 'pong', [ + 'server_time' => date('Y-m-d H:i:s'), + 'server_time_ts' => time(), + ], 'pong'); + return; + } + + if ($action === 'subscribe') { + $rawTopics = $decoded['topics'] ?? []; + $rawList = is_array($rawTopics) ? $rawTopics : []; + $finalTopics = GameWebSocketSubscriptionRegistry::replaceSubscriptions($connection->id, $rawList); + Log::channel('ws')->info('subscribe', [ + 'connection_id' => $connection->id, + 'user_id' => GameWebSocketSubscriptionRegistry::userIdOf($connection->id), + 'topics' => $finalTopics, + 'requested_count' => is_array($rawTopics) ? count($rawTopics) : 0, + ]); + GameWebSocketDispatcher::sendDirect($connection, 'ws.subscribed', [ + 'topics' => $finalTopics, + ], 'subscribed'); + self::pushAdminTestOddsPreview($connection, $finalTopics); + return; + } + + Log::channel('ws')->debug('unsupported action', [ + 'connection_id' => $connection->id, + 'action' => $action, + ]); + GameWebSocketDispatcher::sendDirect($connection, 'ws.error', [ + 'message' => 'Unsupported action', + 'received_action' => $action, + ], 'unsupported_action'); + } + + public function onError(TcpConnection $connection, int $code, string $msg): void + { + Log::channel('ws')->warning('connection error', [ + 'connection_id' => $connection->id, + 'code' => $code, + 'detail' => $msg, + ]); + try { + $connection->send(json_encode([ + 'event' => 'ws.error', + 'message' => 'Server internal error', + 'code' => $code, + 'detail' => $msg, + ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); + } catch (Throwable) { + } + } + + public function onClose(TcpConnection $connection): void + { + $auth = self::$connectionAuth[$connection->id] ?? []; + Log::channel('ws')->info('connection close', [ + 'connection_id' => $connection->id, + 'user_id' => GameWebSocketSubscriptionRegistry::userIdOf($connection->id), + 'mode' => $auth['mode'] ?? '', + 'remaining_connections' => max(0, GameWebSocketSubscriptionRegistry::connectionCount() - 1), + ]); + GameWebSocketSubscriptionRegistry::unregisterConnection($connection->id); + unset(self::$connections[$connection->id], self::$connectionAuth[$connection->id]); + } + + // ============================================================ + // 内部:Timer / 兜底 + // ============================================================ + + /** + * 每秒拉取 Redis 队列事件并分发到对应订阅连接。 + * popBatch 内部已捕获 Redis 异常并写 ws 日志,Timer 不会因此停跑。 */ private static function ensureEventBusConsumer(): void { @@ -31,41 +233,36 @@ class GameWebSocketServer return; } self::$eventBusConsumerStarted = true; - Timer::add(1, static function (): void { - $events = GameWebSocketEventBus::popBatch(); - if ($events === []) { - return; - } - foreach ($events as $event) { - $topic = $event['topic'] ?? ''; - if (!is_string($topic) || $topic === '') { - continue; + Timer::add(self::QUEUE_TICK_INTERVAL, static function (): void { + try { + $events = GameWebSocketEventBus::popBatch(); + if ($events === []) { + return; } - $eventName = $event['event'] ?? $topic; - $data = $event['data'] ?? []; - if (!is_array($data)) { - $data = []; - } - $serverTime = $event['server_time'] ?? time(); - foreach (self::$connections as $connection) { - $topics = $connection->topics ?? []; - if (!is_array($topics) || !in_array($topic, $topics, true)) { - continue; + foreach ($events as $event) { + try { + GameWebSocketDispatcher::dispatch($event, self::$connections); + } catch (Throwable $e) { + Log::channel('ws')->error('dispatch loop exception', [ + 'topic' => $event['topic'] ?? '', + 'error' => $e->getMessage(), + ]); } - $connection->send(json_encode([ - 'event' => $eventName, - 'topic' => $topic, - 'data' => $data, - 'server_time' => $serverTime, - ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); } + } catch (Throwable $e) { + Log::channel('ws')->error('event bus consumer tick exception', [ + 'error' => $e->getMessage(), + ]); } }); + Log::channel('ws')->info('event bus consumer started', [ + 'tick_interval_seconds' => self::QUEUE_TICK_INTERVAL, + ]); } /** - * 兜底直推:admin.live.snapshot 每秒主动构建并广播。 - * 目的:即使 Redis 队列不可用,也能保证 /admin/game/live 实时看到对局变化。 + * admin.live.snapshot 兜底:每秒构建一次快照直接推送(不依赖 Redis 队列)。 + * 用于 /admin/game/live 实时对局页,确保 Redis 异常时后台仍能看到对局状态。 */ private static function ensureAdminLiveSnapshotTicker(): void { @@ -74,18 +271,18 @@ class GameWebSocketServer } self::$adminSnapshotTickerStarted = true; Timer::add(1, static function (): void { - $hasAdminSubscriber = false; - foreach (self::$connections as $connection) { - $topics = $connection->topics ?? []; - if (is_array($topics) && in_array('admin.live.snapshot', $topics, true)) { - $hasAdminSubscriber = true; - break; - } - } - if (!$hasAdminSubscriber) { + $cids = GameWebSocketSubscriptionRegistry::connectionsForTopic('admin.live.snapshot'); + if ($cids === []) { + return; + } + try { + $snapshot = GameLiveService::buildSnapshot(null); + } catch (Throwable $e) { + Log::channel('ws')->error('admin snapshot build failed', [ + 'error' => $e->getMessage(), + ]); return; } - $snapshot = GameLiveService::buildSnapshot(null); $payload = json_encode([ 'event' => 'admin.live.snapshot', 'topic' => 'admin.live.snapshot', @@ -95,106 +292,60 @@ class GameWebSocketServer if (!is_string($payload) || $payload === '') { return; } - foreach (self::$connections as $connection) { - $topics = $connection->topics ?? []; - if (!is_array($topics) || !in_array('admin.live.snapshot', $topics, true)) { + $sent = 0; + $failed = 0; + foreach ($cids as $cid) { + if (!isset(self::$connections[$cid])) { continue; } - $connection->send($payload); + try { + self::$connections[$cid]->send($payload); + $sent++; + } catch (Throwable $e) { + $failed++; + Log::channel('ws')->warning('admin snapshot send failed', [ + 'connection_id' => $cid, + 'error' => $e->getMessage(), + ]); + } } }); } - public function onWorkerStart(): void + /** + * 心跳超时检查器:每 10s 扫描一次,关闭 60s 无上行报文的连接。 + * 触发客户端重连,避免半关闭僵尸连接持续接收推送但不实际送达。 + */ + private static function ensureHeartbeatChecker(): void { - self::ensureEventBusConsumer(); - self::ensureAdminLiveSnapshotTicker(); - } - - public function onConnect(TcpConnection $connection): void - { - $connection->topics = []; - self::$connections[$connection->id] = $connection; - } - - public function onWebSocketConnect(TcpConnection $connection): void - { - self::ensureEventBusConsumer(); - self::ensureAdminLiveSnapshotTicker(); - $connection->send(json_encode([ - 'event' => 'ws.connected', - 'message' => 'WebSocket connected', - 'connection_id' => $connection->id, - 'server_time' => time(), - 'heartbeat_interval' => 30, - ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); - } - - public function onMessage(TcpConnection $connection, string $payload): void - { - $decoded = json_decode($payload, true); - if (!is_array($decoded)) { - $connection->send(json_encode([ - 'event' => 'ws.error', - 'message' => 'Invalid JSON payload', - ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); + if (self::$heartbeatCheckerStarted) { return; } - - $action = $decoded['action'] ?? ''; - if ($action === 'ping') { - $connection->send(json_encode([ - 'event' => 'pong', - 'server_time' => date('Y-m-d H:i:s'), - ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); - return; - } - - if ($action === 'subscribe') { - $topics = $decoded['topics'] ?? []; - $sanitized = []; - if (is_array($topics)) { - foreach ($topics as $topic) { - if (!is_string($topic)) { - continue; + self::$heartbeatCheckerStarted = true; + Timer::add(self::HEARTBEAT_CHECK_INTERVAL, static function (): void { + $cutoff = time() - self::HEARTBEAT_IDLE_SECONDS; + $stale = GameWebSocketSubscriptionRegistry::staleConnections($cutoff); + if ($stale === []) { + return; + } + foreach ($stale as $cid) { + Log::channel('ws')->info('close idle connection', [ + 'connection_id' => $cid, + 'user_id' => GameWebSocketSubscriptionRegistry::userIdOf($cid), + 'idle_seconds' => self::HEARTBEAT_IDLE_SECONDS, + ]); + if (isset(self::$connections[$cid])) { + try { + self::$connections[$cid]->close(); + } catch (Throwable) { } - $value = trim($topic); - if ($value === '') { - continue; - } - $sanitized[] = $value; } } - $connection->topics = array_values(array_unique($sanitized)); - $connection->send(json_encode([ - 'event' => 'ws.subscribed', - 'topics' => $connection->topics, - ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); - self::pushAdminTestOddsPreview($connection, $connection->topics); - return; - } - - $connection->send(json_encode([ - 'event' => 'ws.error', - 'message' => 'Unsupported action', - 'received_action' => $action, - ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); - } - - public function onError(TcpConnection $connection, int $code, string $msg): void - { - $connection->send(json_encode([ - 'event' => 'ws.error', - 'message' => 'Server internal error', - 'code' => $code, - 'detail' => $msg, - ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); - } - - public function onClose(TcpConnection $connection): void - { - $connection->topics = []; - unset(self::$connections[$connection->id]); + }); + Log::channel('ws')->info('heartbeat checker started', [ + 'idle_seconds' => self::HEARTBEAT_IDLE_SECONDS, + 'check_interval' => self::HEARTBEAT_CHECK_INTERVAL, + ]); } /** @@ -210,12 +361,59 @@ class GameWebSocketServer } $serverTime = time(); foreach ($frames as $frame) { - $connection->send(json_encode([ - 'event' => $frame['event'], + GameWebSocketDispatcher::sendDirect($connection, $frame['event'], [ 'topic' => $frame['topic'], 'data' => $frame['data'], 'server_time' => $serverTime, - ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); + ], 'admin_test_preview'); } } + + /** + * 兼容多种 Workerman 版本:尝试取握手 URI 的 query 部分。 + * Workerman v5 中 onWebSocketConnect 第二参数已是 array $headers/$request;这里尽量兼容字符串与对象。 + */ + private static function extractQueryString(TcpConnection $connection, mixed $request): string + { + if (isset($connection->wsHandshakeQuery) && is_string($connection->wsHandshakeQuery)) { + return (string) $connection->wsHandshakeQuery; + } + if (isset($connection->onWebSocketConnect)) { + // noop + } + $serverUri = ''; + if (isset($_SERVER['REQUEST_URI']) && is_string($_SERVER['REQUEST_URI'])) { + $serverUri = $_SERVER['REQUEST_URI']; + } + if ($serverUri === '' && isset($connection->headers) && is_array($connection->headers)) { + $line = $connection->headers['get'] ?? $connection->headers['GET'] ?? ''; + if (is_string($line)) { + $serverUri = $line; + } + } + if ($serverUri === '' && is_string($request)) { + if (preg_match('#^GET\s+([^\s]+)#i', $request, $m) === 1) { + $serverUri = $m[1]; + } + } + if ($serverUri === '' && is_object($request) && method_exists($request, 'queryString')) { + $serverUri = '?' . (string) $request->queryString(); + } + $qPos = strpos($serverUri, '?'); + if ($qPos === false) { + return ''; + } + return substr($serverUri, $qPos + 1); + } + + private static function remoteIp(TcpConnection $connection): string + { + try { + if (method_exists($connection, 'getRemoteIp')) { + return (string) $connection->getRemoteIp(); + } + } catch (Throwable) { + } + return ''; + } } diff --git a/config/log.php b/config/log.php index 7f05de5..32ba1aa 100644 --- a/config/log.php +++ b/config/log.php @@ -29,4 +29,26 @@ return [ ] ], ], + /** + * 游戏 WebSocket 推送链路专用日志(独立通道): + * - 写入 runtime/logs/ws-YYYY-MM-DD.log,保留 7 天 + * - 通过 Log::channel('ws')->info(...) 访问 + * - 记录维度:publish 入队、queue 消费、按 topic/user_id 分发、心跳与连接生命周期、握手鉴权失败等 + */ + 'ws' => [ + 'handlers' => [ + [ + 'class' => Monolog\Handler\RotatingFileHandler::class, + 'constructor' => [ + runtime_path() . '/logs/ws.log', + 7, + Monolog\Logger::DEBUG, + ], + 'formatter' => [ + 'class' => Monolog\Formatter\LineFormatter::class, + 'constructor' => [null, 'Y-m-d H:i:s', true], + ], + ], + ], + ], ]; diff --git a/docs/36字花-移动端接口设计草案.md b/docs/36字花-移动端接口设计草案.md index f0d3197..71c9540 100644 --- a/docs/36字花-移动端接口设计草案.md +++ b/docs/36字花-移动端接口设计草案.md @@ -764,22 +764,35 @@ - **移动端配置缺口**:**`POST /api/game/lobbyInit` 当前不下发 WebSocket 地址**;H5 需与运维约定同一套 `H5_WEBSOCKET_URL`(打包进前端配置、远程配置中心等),与 HTTP API 基址可不同域。 - **混合内容**:若 H5 页面为 **HTTPS**,浏览器要求 WebSocket 使用 **`wss://`**,否则会被拦截。 - **事件投递依赖 Redis**:HTTP 侧业务通过 **`GameWebSocketEventBus`**(Redis 列表)将事件投递到 WebSocket 进程;Redis 不可用或队列异常时,**除 `admin.live.snapshot` 外**的广播类推送可能收不到。后台若订阅了 `admin.live.snapshot`,服务端有**每秒直连构建快照**的兜底,不依赖队列。 -- **鉴权(重要)**:**当前 `GameWebSocketServer` 在握手阶段不校验** URL Query 中的 `token` / `auth_token` / `user-token` 等;**任何人拿到地址即可建立连接**(与 §1 HTTP 接口必须 `auth-token` + `user-token` 不同)。Query 中上述参数为**预留/习惯写法**,便于后续若要在 `onWebSocketConnect` 中实现鉴权再与 HTTP 对齐;**文档中若写「未登录返回 1101」属规划口径,非现网行为**。 +- **握手鉴权(2026-05 重构后强制)**:`GameWebSocketServer::onWebSocketConnect` 通过 `GameWebSocketAuthHelper::authorize` 校验 URL Query。两种合法身份: + - **mobile(H5/移动端)**:必须同时携带 `auth_token`(同 HTTP `auth-token`)+ `user_token`(同 HTTP `user-token`,亦支持 `token` 同义)。校验通过后连接被绑定 `user_id`,分发器仅向其推送本人的 user 级主题。 + - **admin(后台/运维)**:必须携带 `admin_ws_token`(由后台 `wsConfig` 接口签发,写入 Redis Key `dfw:v1:ws:admin_token:{token}`,默认 TTL 7200s)。后台已 `wsConfig` 中把该 token 拼到 `ws_url` 一并返回,前端透传即可;admin 模式 `user_id=0`,可订阅任意主题并收到**全量** user 级推送(运维联调用)。 + - 任一身份不通过 → 服务端发送 `{"event":"ws.error","code":1101,"message":"Authentication failed: ..."}` 并立即 `close`。 +- **服务端按 user_id 过滤(user 级主题)**:以下 topic 的 `data.user_id` 必须 **等于** 当前连接绑定的 `user_id` 才会下发——**`bet.win` / `user.streak` / `wallet.changed` / `bet.accepted` / `auto.spin.progress`**。其它 topic(`period.tick` / `period.opened` / `jackpot.hit` / `admin.*` 等)按订阅广播。admin 模式不参与此过滤。 +- **心跳超时(服务端主动)**:连接 60s 内无任何上行报文(含 `ping`/`subscribe`)即被 server 主动 `close`,触发客户端走重连流程;避免半关闭的僵尸连接长期持有订阅却不能实际送达推送。 +- **独立日志通道 `ws`**:`runtime/logs/ws.log`(保留 7 天)。记录维度包含 `publish 入队 / popBatch 异常 / dispatch(topic/candidates/matched/skipped_not_owner/skipped_closed/send_failed)/ handshake_ok | denied / subscribe / pong / close idle / send failed` 等。排查"为什么没收到推送"时优先看此文件。 - **订阅才有业务推送**:建连后仅会收到握手首帧(见下)及本连接已订阅主题的消息;不发送 `subscribe` 则收不到 `period.tick` 等(`admin.live.snapshot` 同上,需显式订阅)。 ### 7.1 WebSocket 连接与消息 - **连接地址**:见 **§7.0**(环境变量 `H5_WEBSOCKET_URL` 或后台 `wsConfig` 返回的 `ws_url`) - **客户端**:浏览器原生 `WebSocket`(`ws://` / `wss://`) -- **连接时携带参数(可选 / 预留)**: - - URL Query 可带 `token`(与 HTTP 头 `user-token` 同义习惯)、`auth_token`(与 HTTP 头 `auth-token` 同义习惯)、`device_id`、`lang` 等,**当前服务端不解析、不校验**;若后续版本实现握手鉴权,以发布说明为准。 - - 示例(习惯写法):`wss://ws.example.com/ws?token=xxx&auth_token=xxx&device_id=ios_001&lang=zh` +- **连接时必带 Query 参数(2026-05 起强制)**: + - **H5/移动端**:`auth_token=` + `user_token=`(亦支持 `token` 同义)。`device_id`、`lang` 仍可携带,但服务端不强制。 + - **后台**:`admin_ws_token=`(后台 `wsConfig` 已直接把它拼到 `ws_url`,前端透传即可)。 + - 示例: + - H5:`wss://ws.example.com/ws/?auth_token=xxx&user_token=yyy&device_id=ios_001&lang=zh` + - 后台:`wss://ws.example.com/ws/?admin_ws_token=zzz` + - 缺失任一必填字段或 token 失效 → 服务端回 `{"event":"ws.error","code":1101,...}` 后立即关闭连接。 - **连接成功首帧(当前实现)**: - `event`:`ws.connected` - `message`:固定文案 `WebSocket connected`(便于联调日志) - `connection_id`:连接唯一标识(进程内) + - `mode`:`mobile` | `admin`(2026-05 新增;表明本连接的鉴权身份) + - `user_id`:int(2026-05 新增;mobile 模式为真实玩家 id,admin 模式为 0) - `server_time`:服务器时间戳(**秒**,int) - `heartbeat_interval`:建议心跳间隔(**秒**,当前实现固定为 `30`) + - `idle_timeout`:服务端主动关闭的空闲秒数(**秒**,当前实现固定为 `60`;客户端 `idle_timeout - 心跳间隔` 内必须发出 `ping`,否则会被 server 主动 `close`) - **连接后错误帧(当前实现,非 HTTP 业务码)**: - JSON 无法解析:`event`=`ws.error`,`message`=`Invalid JSON payload`(无 `code` 或与 HTTP `code` 不同体系) - 未知 `action`:`event`=`ws.error`,`message`=`Unsupported action`,并可能带 `received_action` @@ -807,8 +820,10 @@ #### 7.1.2 订阅行为说明 - **仅建立连接不会自动下发全部业务消息**;客户端需要发送 `subscribe` 明确订阅主题。 -- 成功订阅后服务端返回:`{"event":"ws.subscribed","topics":[...]}`。 +- 成功订阅后服务端返回:`{"event":"ws.subscribed","topics":[...]}`(已去重、按字典序排序,与提交顺序无关)。 +- **`subscribe` 覆盖式生效**:每次发送都会**完全替换**该连接的订阅集合(不是累加)。需要追加请把已有列表一并发上来。 - 若未订阅主题,通常只能收到握手首帧(`ws.connected`)和心跳回包(`pong`)。 +- **服务端按 user_id 过滤**:mobile 模式连接只会收到 `data.user_id == 自己 user_id` 的 user 级主题(见 §7.0 列表);admin 模式不过滤,收到全量。**客户端仍应做一次防御性 `user_id` 过滤**,避免后续接口变更带来误处理。 - **不下发** `streak_win_reward` 全表(1~10 档);赔率仅通过 `user.streak` / `wallet.changed` / `bet.accepted` 及 `lobbyInit.user_snapshot` 推送**当前登录玩家**本局适用字段。 #### 7.1.2A 连胜赔率与连胜场次(WebSocket) diff --git a/scripts/smoke_ws_auth_helper.php b/scripts/smoke_ws_auth_helper.php new file mode 100644 index 0000000..c743e89 --- /dev/null +++ b/scripts/smoke_ws_auth_helper.php @@ -0,0 +1,59 @@ + 'whatever']); +echo json_encode($r, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT), PHP_EOL, PHP_EOL; + +echo "[3] authorize: invalid auth-token\n"; +$r = GameWebSocketAuthHelper::authorize(['auth_token' => 'not_existing_token', 'user_token' => 'not_existing_user_token']); +echo json_encode($r, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT), PHP_EOL, PHP_EOL; + +echo "[4] authorize: invalid admin_ws_token bypass\n"; +$r = GameWebSocketAuthHelper::authorize(['admin_ws_token' => 'no_such_admin_token_xxxxxx']); +echo json_encode($r, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT), PHP_EOL, PHP_EOL; + +echo "[5] issue + validate admin_ws_token\n"; +$issued = GameWebSocketAuthHelper::issueAdminWsToken(99, 60); +echo 'issued: ', json_encode($issued, JSON_UNESCAPED_UNICODE), PHP_EOL; +if (($issued['token'] ?? '') !== '') { + $r = GameWebSocketAuthHelper::authorize(['admin_ws_token' => $issued['token']]); + echo 'authorize ok with issued token: ', json_encode($r, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT), PHP_EOL; +} else { + echo "WARN: issueAdminWsToken returned empty (Redis 不可用?)\n"; +} +echo PHP_EOL; + +echo "[6] SubscriptionRegistry\n"; +GameWebSocketSubscriptionRegistry::reset(); +GameWebSocketSubscriptionRegistry::registerConnection(10, 14, '127.0.0.1'); +GameWebSocketSubscriptionRegistry::registerConnection(11, 0, '127.0.0.1'); +$t1 = GameWebSocketSubscriptionRegistry::replaceSubscriptions(10, ['bet.win', 'period.opened', 'period.opened', '', ' ', 'bet.win']); +$t2 = GameWebSocketSubscriptionRegistry::replaceSubscriptions(11, ['admin.live.snapshot', 'period.opened']); +echo 'cid=10 topics=', json_encode($t1), PHP_EOL; +echo 'cid=11 topics=', json_encode($t2), PHP_EOL; +echo 'period.opened subscribers=', json_encode(GameWebSocketSubscriptionRegistry::connectionsForTopic('period.opened')), PHP_EOL; +echo 'bet.win subscribers=', json_encode(GameWebSocketSubscriptionRegistry::connectionsForTopic('bet.win')), PHP_EOL; +echo 'admin.live.snapshot subscribers=', json_encode(GameWebSocketSubscriptionRegistry::connectionsForTopic('admin.live.snapshot')), PHP_EOL; +GameWebSocketSubscriptionRegistry::unregisterConnection(10); +echo 'after unregister(10), bet.win subscribers=', json_encode(GameWebSocketSubscriptionRegistry::connectionsForTopic('bet.win')), PHP_EOL; +echo 'after unregister(10), period.opened subscribers=', json_encode(GameWebSocketSubscriptionRegistry::connectionsForTopic('period.opened')), PHP_EOL; +echo 'remaining connection count=', GameWebSocketSubscriptionRegistry::connectionCount(), PHP_EOL; + +echo "\nALL DONE\n"; diff --git a/web/src/views/backend/game/live/index.vue b/web/src/views/backend/game/live/index.vue index ecb7845..e5628ff 100644 --- a/web/src/views/backend/game/live/index.vue +++ b/web/src/views/backend/game/live/index.vue @@ -463,7 +463,16 @@ function connectWs(): void { socket.onclose = () => { wsConnected.value = false wsClient.value = null - window.setTimeout(() => { + // 断线后:先刷新 wsConfig 拿新的 admin_ws_token(避免握手 token 已过期反复失败),再重连 + window.setTimeout(async () => { + if (wsConnected.value) { + return + } + try { + await reloadWsConfig() + } catch { + /* ignore;下次重连时再试 */ + } if (!wsConnected.value) { connectWs() }