1.重构websocket连接

This commit is contained in:
2026-05-27 10:28:39 +08:00
parent a7c2a29764
commit 8f5ba977a4
12 changed files with 1101 additions and 144 deletions

View File

@@ -6,6 +6,7 @@ use app\common\controller\Backend;
use app\common\library\admin\WebSocketConfigHelper; use app\common\library\admin\WebSocketConfigHelper;
use app\common\service\GameLiveService; use app\common\service\GameLiveService;
use app\common\service\GameRecordService; use app\common\service\GameRecordService;
use app\common\service\GameWebSocketAuthHelper;
use support\Response; use support\Response;
use Webman\Http\Request as WebmanRequest; use Webman\Http\Request as WebmanRequest;
@@ -67,9 +68,19 @@ class Live extends Backend
'auto.spin.progress', 'auto.spin.progress',
]; ];
$adminId = $this->auth ? (int) ($this->auth->id ?? 0) : 0;
$adminWs = GameWebSocketAuthHelper::issueAdminWsToken($adminId);
$baseWsUrl = WebSocketConfigHelper::wsUrl($request);
$wsUrl = WebSocketConfigHelper::appendTokensToWsUrl($baseWsUrl, [
'admin_ws_token' => (string) ($adminWs['token'] ?? ''),
]);
return $this->success('', [ return $this->success('', [
'name' => 'ws.admin.live', 'name' => 'ws.admin.live',
'ws_url' => WebSocketConfigHelper::wsUrl($request), 'ws_url' => $wsUrl,
'ws_base_url' => $baseWsUrl,
'admin_ws_token' => (string) ($adminWs['token'] ?? ''),
'admin_ws_token_ttl' => (int) ($adminWs['ttl'] ?? 0),
'connect_tip' => 'The admin live page auto-subscribes topics for status, draw result and payout events.', 'connect_tip' => 'The admin live page auto-subscribes topics for status, draw result and payout events.',
'subscribe_topics' => $topics, 'subscribe_topics' => $topics,
'sample_messages' => [ 'sample_messages' => [

View File

@@ -6,6 +6,7 @@ namespace app\admin\controller\test;
use app\common\controller\Backend; use app\common\controller\Backend;
use app\common\library\admin\WebSocketConfigHelper; use app\common\library\admin\WebSocketConfigHelper;
use app\common\service\GameWebSocketAuthHelper;
use app\common\service\GameWebSocketPayloadHelper; use app\common\service\GameWebSocketPayloadHelper;
use support\Response; use support\Response;
use Webman\Http\Request as WebmanRequest; use Webman\Http\Request as WebmanRequest;
@@ -43,9 +44,19 @@ class GameCurrentStatus extends Backend
$oddsPushTopics = GameWebSocketPayloadHelper::ODDS_PUSH_TOPICS; $oddsPushTopics = GameWebSocketPayloadHelper::ODDS_PUSH_TOPICS;
$testPlayerOdds = GameWebSocketPayloadHelper::adminTestPlayerOddsSnapshot(); $testPlayerOdds = GameWebSocketPayloadHelper::adminTestPlayerOddsSnapshot();
$adminId = $this->auth ? (int) ($this->auth->id ?? 0) : 0;
$adminWs = GameWebSocketAuthHelper::issueAdminWsToken($adminId);
$baseWsUrl = WebSocketConfigHelper::wsUrl($request);
$wsUrl = WebSocketConfigHelper::appendTokensToWsUrl($baseWsUrl, [
'admin_ws_token' => (string) ($adminWs['token'] ?? ''),
]);
return $this->success('', [ return $this->success('', [
'name' => 'ws.period', 'name' => 'ws.period',
'ws_url' => WebSocketConfigHelper::wsUrl($request), 'ws_url' => $wsUrl,
'ws_base_url' => $baseWsUrl,
'admin_ws_token' => (string) ($adminWs['token'] ?? ''),
'admin_ws_token_ttl' => (int) ($adminWs['ttl'] ?? 0),
'connect_tip' => '连接成功后将自动订阅下列主题。真实业务仅在有玩家下注/结算时推送赔率;本页联调会在订阅后额外推送带 is_test/preview 的演示帧(见下方测试玩家赔率)。', 'connect_tip' => '连接成功后将自动订阅下列主题。真实业务仅在有玩家下注/结算时推送赔率;本页联调会在订阅后额外推送带 is_test/preview 的演示帧(见下方测试玩家赔率)。',
'subscribe_topics' => $subscribeTopics, 'subscribe_topics' => $subscribeTopics,
'odds_push_topics' => $oddsPushTopics, 'odds_push_topics' => $oddsPushTopics,

View File

@@ -40,6 +40,33 @@ final class WebSocketConfigHelper
return 'ws://127.0.0.1:3131/ws/'; return 'ws://127.0.0.1:3131/ws/';
} }
/**
* 在基础 ws_url 上拼接握手鉴权 Query
* - 后台用auth_token + admin_ws_token可观测全量主题无 user_id 过滤)
* - H5 用:调用方传 user_token与 auth_token 一起拼上去
*
* @param array{auth_token?: string, user_token?: string, admin_ws_token?: string} $tokens
*/
public static function appendTokensToWsUrl(string $wsUrl, array $tokens): string
{
$wsUrl = trim($wsUrl);
if ($wsUrl === '') {
return $wsUrl;
}
$pairs = [];
foreach (['auth_token', 'user_token', 'admin_ws_token'] as $key) {
$val = isset($tokens[$key]) && is_string($tokens[$key]) ? trim($tokens[$key]) : '';
if ($val !== '') {
$pairs[] = $key . '=' . rawurlencode($val);
}
}
if ($pairs === []) {
return $wsUrl;
}
$sep = str_contains($wsUrl, '?') ? '&' : '?';
return $wsUrl . $sep . implode('&', $pairs);
}
private static function isLoopbackWsUrl(string $url): bool private static function isLoopbackWsUrl(string $url): bool
{ {
$host = parse_url($url, PHP_URL_HOST); $host = parse_url($url, PHP_URL_HOST);

View File

@@ -0,0 +1,224 @@
<?php
declare(strict_types=1);
namespace app\common\service;
use app\common\facade\Token;
use app\common\library\Auth;
use support\Redis;
use Throwable;
/**
* WebSocket 握手鉴权助手(与 HTTP §1.3 对齐):
*
* 两种合法身份:
* 1) **mobileH5/移动端)**URL Query 必须带 `auth_token` + `user_token`,校验通过后绑定 user_id
* 分发器对 user 级主题bet.win 等)按 user_id 过滤,只发本人。
* 2) **admin后台联调/实时对局页)**URL Query 必须带 `auth_token` + `admin_ws_token`
* `admin_ws_token` 由后台 `wsConfig` 接口签发并写入 Redis短时签名。绑定 user_id=0
* 分发器对该连接不做 user 级过滤,可观测全量推送(用于运维/联调)。
*
* 任一身份通过即可建连;都不满足则拒绝握手。
*
* 返回结构:
* [
* 'ok' => bool,
* 'user_id' => int,
* 'mode' => 'mobile' | 'admin' | '',
* 'admin_id'=> int,
* 'reason' => string,
* 'auth_token' => string,
* 'user_token' => string,
* 'admin_ws_token' => string,
* ]
*/
final class GameWebSocketAuthHelper
{
/** admin_ws_token 在 Redis 中的 key 前缀value 存 admin_idTTL 由 issueAdminWsToken 决定 */
private const ADMIN_TOKEN_REDIS_PREFIX = 'dfw:v1:ws:admin_token:';
private const ADMIN_TOKEN_DEFAULT_TTL = 7200;
/**
* @param array<string, mixed> $query 解析后的 URL Query 参数
* @return array{ok:bool, user_id:int, mode:string, admin_id:int, reason:string, auth_token:string, user_token:string, admin_ws_token:string}
*/
public static function authorize(array $query): array
{
$authToken = self::pickFirstString($query, ['auth_token', 'auth-token', 'authToken']);
$userToken = self::pickFirstString($query, ['user_token', 'user-token', 'userToken', 'token']);
$adminWsToken = self::pickFirstString($query, ['admin_ws_token', 'admin-ws-token', 'adminWsToken']);
// ===== Admin 旁路:只校验 admin_ws_token由后台 wsConfig 签发,已隐含管理员身份) =====
if ($adminWsToken !== '') {
$adminId = self::validateAdminWsToken($adminWsToken);
if ($adminId > 0) {
return [
'ok' => true,
'user_id' => 0,
'mode' => 'admin',
'admin_id' => $adminId,
'reason' => '',
'auth_token' => $authToken,
'user_token' => $userToken,
'admin_ws_token' => $adminWsToken,
];
}
return self::deny('admin-ws-token invalid or expired', $authToken, $userToken, $adminWsToken);
}
// ===== MobileH5必须同时校验 auth-token + user-token =====
if ($authToken === '') {
return self::deny('missing auth-token', '', $userToken, '');
}
$authData = Token::get($authToken);
if (!is_array($authData) || ($authData['type'] ?? '') !== 'auth-token') {
return self::deny('invalid auth-token type', $authToken, $userToken, '');
}
$authExpire = filter_var($authData['expire_time'] ?? 0, FILTER_VALIDATE_INT);
if ($authExpire === false || $authExpire < time()) {
return self::deny('auth-token expired', $authToken, $userToken, '');
}
if ($userToken === '') {
return self::deny('missing user-token', $authToken, '', '');
}
$userData = Token::get($userToken);
if (!is_array($userData) || ($userData['type'] ?? '') !== Auth::TOKEN_TYPE) {
return self::deny('invalid user-token type', $authToken, $userToken, '');
}
$userExpire = filter_var($userData['expire_time'] ?? 0, FILTER_VALIDATE_INT);
if ($userExpire === false || $userExpire < time()) {
return self::deny('user-token expired', $authToken, $userToken, '');
}
$userId = filter_var($userData['user_id'] ?? 0, FILTER_VALIDATE_INT);
if ($userId === false || $userId <= 0) {
return self::deny('user-token has no user_id', $authToken, $userToken, '');
}
return [
'ok' => true,
'user_id' => (int) $userId,
'mode' => 'mobile',
'admin_id' => 0,
'reason' => '',
'auth_token' => $authToken,
'user_token' => $userToken,
'admin_ws_token' => '',
];
}
/**
* 为已登录的后台管理员签发短时 admin-ws-token返回 [token, ttl]。
* 调用方app/admin/controller/test/GameCurrentStatus::wsConfig、app/admin/controller/game/Live::wsConfig
*/
public static function issueAdminWsToken(int $adminId, ?int $ttl = null): array
{
if ($adminId <= 0) {
return ['token' => '', 'ttl' => 0];
}
$ttl = ($ttl !== null && $ttl > 0) ? $ttl : self::ADMIN_TOKEN_DEFAULT_TTL;
try {
$token = bin2hex(random_bytes(20));
} catch (Throwable) {
$token = md5(uniqid('admin_ws_', true) . microtime(true) . random_int(0, PHP_INT_MAX));
}
try {
Redis::setEx(self::ADMIN_TOKEN_REDIS_PREFIX . $token, $ttl, (string) $adminId);
} catch (Throwable) {
return ['token' => '', 'ttl' => 0];
}
return ['token' => $token, 'ttl' => $ttl];
}
/**
* 校验 admin-ws-token返回 admin_id>0 表示有效0 表示无效/过期。
*/
public static function validateAdminWsToken(string $token): int
{
$token = trim($token);
if ($token === '' || strlen($token) > 96) {
return 0;
}
try {
$raw = Redis::get(self::ADMIN_TOKEN_REDIS_PREFIX . $token);
} catch (Throwable) {
return 0;
}
if ($raw === false || $raw === null || $raw === '') {
return 0;
}
$adminId = filter_var($raw, FILTER_VALIDATE_INT);
return $adminId === false ? 0 : (int) $adminId;
}
/**
* 从 ws header 中解析 GET 行 QueryWorkerman 在 onWebSocketConnect($connection, $request) 时
* $request 可能为字符串或对象;为兼容,这里允许直接传 URI Query 字符串)。
*
* @return array<string, string>
*/
public static function parseQueryString(string $queryString): array
{
$queryString = trim($queryString);
if ($queryString === '') {
return [];
}
if ($queryString[0] === '?') {
$queryString = substr($queryString, 1);
}
$out = [];
parse_str($queryString, $out);
$clean = [];
foreach ($out as $k => $v) {
if (!is_string($k)) {
continue;
}
if (is_string($v)) {
$clean[$k] = $v;
} elseif (is_scalar($v)) {
$clean[$k] = (string) $v;
}
}
return $clean;
}
/**
* @param array<string, mixed> $query
* @param list<string> $keys
*/
private static function pickFirstString(array $query, array $keys): string
{
foreach ($keys as $k) {
if (!isset($query[$k])) {
continue;
}
$v = $query[$k];
if (!is_scalar($v)) {
continue;
}
$s = trim((string) $v);
if ($s !== '') {
return $s;
}
}
return '';
}
/**
* @return array{ok:bool, user_id:int, mode:string, admin_id:int, reason:string, auth_token:string, user_token:string, admin_ws_token:string}
*/
private static function deny(string $reason, string $authToken, string $userToken, string $adminWsToken): array
{
return [
'ok' => false,
'user_id' => 0,
'mode' => '',
'admin_id' => 0,
'reason' => $reason,
'auth_token' => $authToken,
'user_token' => $userToken,
'admin_ws_token' => $adminWsToken,
];
}
}

View File

@@ -0,0 +1,154 @@
<?php
declare(strict_types=1);
namespace app\common\service;
use support\Log;
use Throwable;
use Workerman\Connection\TcpConnection;
/**
* WebSocket 事件分发器(仅 gameWebSocketServer 进程内调用)。
*
* 职责:
* 1. 从 GameWebSocketEventBus 队列消费事件
* 2. 按 topic 反向索引取出候选 connection_id
* 3. user 级主题bet.win / user.streak / wallet.changed / bet.accepted 等)按
* data.user_id 与连接绑定 user_id 比对,仅命中本人才下发
* 4. 每一步都打 ws 日志,便于排查"为什么没收到推送"
*/
final class GameWebSocketDispatcher
{
/**
* 这些 topic 的 data.user_id 必须等于连接绑定的 user_id 才会下发;
* 其它 topicperiod.tick / period.opened / jackpot.hit / admin.* 等)一律广播给订阅者。
*
* 与 docs/36字花-移动端接口设计草案.md §7.1.2A 保持一致。
*
* @var list<string>
*/
private const USER_SCOPED_TOPICS = [
'bet.win',
'user.streak',
'wallet.changed',
'bet.accepted',
'auto.spin.progress',
];
/**
* 分发单条事件到所有命中的连接。
*
* @param array{topic:string, event:string, data:array<string,mixed>, server_time:int} $event
* @param array<int, TcpConnection> $connections connection_id => TcpConnection
*/
public static function dispatch(array $event, array $connections): void
{
$topic = $event['topic'] ?? '';
if (!is_string($topic) || $topic === '') {
return;
}
$candidateIds = GameWebSocketSubscriptionRegistry::connectionsForTopic($topic);
if ($candidateIds === []) {
Log::channel('ws')->debug('dispatch skip: no subscriber', [
'topic' => $topic,
'queue_server_time' => $event['server_time'] ?? 0,
]);
return;
}
$userScoped = in_array($topic, self::USER_SCOPED_TOPICS, true);
$payloadUserId = 0;
if ($userScoped) {
$raw = $event['data']['user_id'] ?? 0;
$parsed = filter_var($raw, FILTER_VALIDATE_INT);
$payloadUserId = $parsed === false ? 0 : (int) $parsed;
}
$frame = json_encode([
'event' => $event['event'] ?? $topic,
'topic' => $topic,
'data' => $event['data'] ?? [],
'server_time' => $event['server_time'] ?? time(),
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
if (!is_string($frame) || $frame === '') {
Log::channel('ws')->warning('dispatch skip: invalid json frame', [
'topic' => $topic,
]);
return;
}
$matched = 0;
$skippedNotOwner = 0;
$skippedClosed = 0;
$sendFailed = 0;
foreach ($candidateIds as $cid) {
if (!isset($connections[$cid])) {
$skippedClosed++;
continue;
}
if ($userScoped && $payloadUserId > 0) {
$boundUid = GameWebSocketSubscriptionRegistry::userIdOf($cid);
if ($boundUid !== $payloadUserId) {
$skippedNotOwner++;
continue;
}
}
try {
$connections[$cid]->send($frame);
$matched++;
} catch (Throwable $e) {
$sendFailed++;
Log::channel('ws')->warning('dispatch send failed', [
'topic' => $topic,
'connection_id' => $cid,
'error' => $e->getMessage(),
]);
}
}
Log::channel('ws')->info('dispatch', [
'topic' => $topic,
'user_scoped' => $userScoped,
'payload_user_id' => $payloadUserId,
'candidates' => count($candidateIds),
'matched' => $matched,
'skipped_not_owner' => $skippedNotOwner,
'skipped_closed' => $skippedClosed,
'send_failed' => $sendFailed,
'frame_size' => strlen($frame),
]);
}
/**
* 直接向某连接下发单帧(握手回执 / 订阅回执 / pong / 演示帧)。
*/
public static function sendDirect(TcpConnection $connection, string $event, array $data, string $tag = ''): void
{
$frame = json_encode(array_merge([
'event' => $event,
], $data), JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
if (!is_string($frame) || $frame === '') {
return;
}
try {
$connection->send($frame);
Log::channel('ws')->debug('direct send', [
'connection_id' => $connection->id,
'event' => $event,
'tag' => $tag,
'frame_size' => strlen($frame),
]);
} catch (Throwable $e) {
Log::channel('ws')->warning('direct send failed', [
'connection_id' => $connection->id,
'event' => $event,
'tag' => $tag,
'error' => $e->getMessage(),
]);
}
}
}

View File

@@ -10,6 +10,8 @@ use Throwable;
/** /**
* 通过 Redis 列表在不同进程间投递 WebSocket 事件。 * 通过 Redis 列表在不同进程间投递 WebSocket 事件。
*
* 入队失败统一返回 false 并写 ws 日志runtime/logs/ws.log便于排查"为什么没有推送"。
*/ */
final class GameWebSocketEventBus final class GameWebSocketEventBus
{ {
@@ -18,7 +20,7 @@ final class GameWebSocketEventBus
/** /**
* @param array<string, mixed> $data * @param array<string, mixed> $data
* @return bool 是否成功入队false 表示 Redis 不可用或参数非法,调用方应避免标记已推送 * @return bool 是否成功入队false 表示 Redis 不可用或参数非法,调用方应避免标记"已推送"
*/ */
public static function publish(string $topic, array $data): bool public static function publish(string $topic, array $data): bool
{ {
@@ -34,12 +36,34 @@ final class GameWebSocketEventBus
]; ];
$json = json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); $json = json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
if (!is_string($json) || $json === '') { if (!is_string($json) || $json === '') {
Log::channel('ws')->warning('publish skip: invalid json payload', [
'topic' => $topic,
]);
return false; return false;
} }
try { try {
$len = Redis::lPush(self::KEY_QUEUE, $json); $len = Redis::lPush(self::KEY_QUEUE, $json);
return is_numeric($len) && (int) $len > 0; $ok = is_numeric($len) && (int) $len > 0;
if ($ok) {
$uid = filter_var($data['user_id'] ?? 0, FILTER_VALIDATE_INT);
Log::channel('ws')->info('publish', [
'topic' => $topic,
'user_id' => $uid === false ? 0 : (int) $uid,
'queue_len_after' => (int) $len,
'payload_size' => strlen($json),
]);
} else {
Log::channel('ws')->warning('publish lpush returned non-positive', [
'topic' => $topic,
'returned' => $len,
]);
}
return $ok;
} catch (Throwable $e) { } catch (Throwable $e) {
Log::channel('ws')->error('publish failed (redis exception)', [
'topic' => $topic,
'error' => $e->getMessage(),
]);
Log::warning('ws event bus publish failed', [ Log::warning('ws event bus publish failed', [
'topic' => $topic, 'topic' => $topic,
'error' => $e->getMessage(), 'error' => $e->getMessage(),
@@ -92,10 +116,27 @@ final class GameWebSocketEventBus
'server_time' => $serverTime, 'server_time' => $serverTime,
]; ];
} }
} catch (Throwable) { } catch (Throwable $e) {
Log::channel('ws')->error('popBatch failed (redis exception)', [
'error' => $e->getMessage(),
'popped' => count($out),
]);
return $out; return $out;
} }
return $out; return $out;
} }
/**
* 当前队列堆积长度(监控用)。
*/
public static function queueLength(): int
{
try {
$len = Redis::lLen(self::KEY_QUEUE);
return is_numeric($len) ? (int) $len : 0;
} catch (Throwable) {
return -1;
}
}
} }

View File

@@ -0,0 +1,186 @@
<?php
declare(strict_types=1);
namespace app\common\service;
/**
* 进程内 WebSocket 订阅注册表(仅在 gameWebSocketServer 单进程内使用)。
*
* - 维护双向索引:
* - topic => map<connection_id, true>
* - connection => [ topics: list<string>, user_id: int, last_seen_at: int, remote_ip: string ]
* - 分发器按 (topic) 直接拿到候选连接列表,按 (data.user_id) 过滤后再 send避免 O(N) 全遍历。
* - **必须** 与 GameWebSocketServer 同进程使用count=1不可水平扩展多 worker 间无法共享连接)。
*
* 该类不持有 TcpConnection 引用,仅持有 connection_id 与元数据Server 维护 connection_id => TcpConnection 映射。
*/
final class GameWebSocketSubscriptionRegistry
{
/** @var array<string, array<int, true>> topic => { connection_id: true } */
private static array $topicIndex = [];
/** @var array<int, array{topics: list<string>, user_id: int, last_seen_at: int, remote_ip: string}> */
private static array $connectionMeta = [];
/**
* 注册新连接onConnect 调用)。
*/
public static function registerConnection(int $connectionId, int $userId, string $remoteIp = ''): void
{
if ($connectionId <= 0) {
return;
}
self::$connectionMeta[$connectionId] = [
'topics' => [],
'user_id' => max(0, $userId),
'last_seen_at' => time(),
'remote_ip' => $remoteIp,
];
}
/**
* 注销连接onClose 调用):从所有 topic 索引中移除该 connection。
*/
public static function unregisterConnection(int $connectionId): void
{
if ($connectionId <= 0 || !isset(self::$connectionMeta[$connectionId])) {
return;
}
foreach (self::$connectionMeta[$connectionId]['topics'] as $topic) {
if (isset(self::$topicIndex[$topic][$connectionId])) {
unset(self::$topicIndex[$topic][$connectionId]);
if (self::$topicIndex[$topic] === []) {
unset(self::$topicIndex[$topic]);
}
}
}
unset(self::$connectionMeta[$connectionId]);
}
/**
* 替换该连接的订阅列表subscribe 报文调用,覆盖式订阅,符合现网协议)。
*
* @param list<string> $topics
* @return list<string> 实际生效的、去重排序后的订阅列表
*/
public static function replaceSubscriptions(int $connectionId, array $topics): array
{
if ($connectionId <= 0 || !isset(self::$connectionMeta[$connectionId])) {
return [];
}
foreach (self::$connectionMeta[$connectionId]['topics'] as $oldTopic) {
if (isset(self::$topicIndex[$oldTopic][$connectionId])) {
unset(self::$topicIndex[$oldTopic][$connectionId]);
if (self::$topicIndex[$oldTopic] === []) {
unset(self::$topicIndex[$oldTopic]);
}
}
}
$clean = [];
foreach ($topics as $t) {
if (!is_string($t)) {
continue;
}
$v = trim($t);
if ($v === '' || strlen($v) > 64) {
continue;
}
$clean[$v] = true;
}
$finalTopics = array_keys($clean);
sort($finalTopics);
self::$connectionMeta[$connectionId]['topics'] = $finalTopics;
foreach ($finalTopics as $topic) {
self::$topicIndex[$topic][$connectionId] = true;
}
return $finalTopics;
}
/**
* 获取订阅了指定 topic 的所有 connection_id。
*
* @return list<int>
*/
public static function connectionsForTopic(string $topic): array
{
$topic = trim($topic);
if ($topic === '' || !isset(self::$topicIndex[$topic])) {
return [];
}
return array_keys(self::$topicIndex[$topic]);
}
/**
* 标记连接活跃时间(接收任意消息时调用,用于心跳超时判断)。
*/
public static function touch(int $connectionId): void
{
if (isset(self::$connectionMeta[$connectionId])) {
self::$connectionMeta[$connectionId]['last_seen_at'] = time();
}
}
/**
* @return array{topics: list<string>, user_id: int, last_seen_at: int, remote_ip: string}|null
*/
public static function meta(int $connectionId): ?array
{
return self::$connectionMeta[$connectionId] ?? null;
}
public static function userIdOf(int $connectionId): int
{
return self::$connectionMeta[$connectionId]['user_id'] ?? 0;
}
/**
* 找出所有 last_seen_at 早于 $cutoff 的连接 id用于服务端主动关闭僵尸连接
*
* @return list<int>
*/
public static function staleConnections(int $cutoff): array
{
$out = [];
foreach (self::$connectionMeta as $cid => $meta) {
if (($meta['last_seen_at'] ?? 0) < $cutoff) {
$out[] = $cid;
}
}
return $out;
}
/**
* 当前活跃连接数(运维/诊断用)。
*/
public static function connectionCount(): int
{
return count(self::$connectionMeta);
}
/**
* 当前活跃订阅总数(运维/诊断用)。
*/
public static function subscriptionCount(): int
{
$sum = 0;
foreach (self::$topicIndex as $conns) {
$sum += count($conns);
}
return $sum;
}
/**
* 仅供测试/进程重启时清空索引。
*/
public static function reset(): void
{
self::$topicIndex = [];
self::$connectionMeta = [];
}
}

View File

@@ -4,26 +4,228 @@ declare(strict_types=1);
namespace app\process; namespace app\process;
use app\common\service\GameWebSocketEventBus;
use app\common\service\GameLiveService; use app\common\service\GameLiveService;
use app\common\service\GameWebSocketAuthHelper;
use app\common\service\GameWebSocketDispatcher;
use app\common\service\GameWebSocketEventBus;
use app\common\service\GameWebSocketPayloadHelper; use app\common\service\GameWebSocketPayloadHelper;
use app\common\service\GameWebSocketSubscriptionRegistry;
use support\Log;
use Throwable;
use Workerman\Connection\TcpConnection; use Workerman\Connection\TcpConnection;
use Workerman\Timer; use Workerman\Timer;
/** /**
* 后台测试页 WebSocket 服务(仅用于连接联调 * H5/后台 WebSocket 服务(重构版2026-05
*
* 设计与 docs/36字花-移动端接口设计草案.md §7 对齐:
*
* - 握手鉴权GameWebSocketAuthHelper
* - mobileURL Query `auth_token` + `user_token`,绑定 user_iduser 级主题按 user_id 过滤
* - admin URL Query `admin_ws_token`(后台 wsConfig 签发,写 Redis 短时签名user_id=0
* user 级主题不过滤(运维/联调可观测全量)
* - 客户端 -> 服务端:`{"action":"ping"}` / `{"action":"subscribe","topics":[...]}`
* - 服务端 -> 客户端:`ws.connected` / `ws.subscribed` / `pong` / `ws.error` / 业务事件帧
* - 主动心跳超时:连接 60s 内无任何上行报文(含 ping即被 server 主动 close
* 触发客户端重连,避免半关闭僵尸连接堵在分发表里
* - 事件分发由 GameWebSocketDispatcher 负责;每次 publish/consume/dispatch/send/订阅/关闭
* 都写入独立日志通道 runtime/logs/ws.log
*
* 高可用:
* - 消费 Timer 的 popBatch 异常已被 EventBus 内部捕获并记录,下次 tick 自动恢复
* - 任一连接 send 异常不会影响其它连接的下发Dispatcher 内单连接 try/catch
* - admin.live.snapshot 兜底直推 Timer 与队列消费解耦Redis 异常时后台仍能看到对局状态
*/ */
class GameWebSocketServer class GameWebSocketServer
{ {
/** @var array<int, TcpConnection> */ /** @var array<int, TcpConnection> connection_id => TcpConnection仅本进程内 */
private static array $connections = []; private static array $connections = [];
/** @var array<int, array<string, mixed>> connection_id => 鉴权元数据mode/admin_id 等,便于日志/排查) */
private static array $connectionAuth = [];
private static bool $eventBusConsumerStarted = false; private static bool $eventBusConsumerStarted = false;
private static bool $adminSnapshotTickerStarted = false; private static bool $adminSnapshotTickerStarted = false;
private static bool $heartbeatCheckerStarted = false;
/** 客户端 60s 内无任何上行报文(含 ping即被主动断开 */
private const HEARTBEAT_IDLE_SECONDS = 60;
/** 事件队列每 N ms 拉一次(默认 1s与文档 §7.1.3 一致) */
private const QUEUE_TICK_INTERVAL = 1;
/** 心跳超时检查间隔(秒) */
private const HEARTBEAT_CHECK_INTERVAL = 10;
public function onWorkerStart(): void
{
Log::channel('ws')->info('ws worker start', [
'pid' => function_exists('posix_getpid') ? posix_getpid() : getmypid(),
]);
self::ensureEventBusConsumer();
self::ensureAdminLiveSnapshotTicker();
self::ensureHeartbeatChecker();
}
public function onConnect(TcpConnection $connection): void
{
$connection->topics = [];
$connection->wsAuth = null;
self::$connections[$connection->id] = $connection;
Log::channel('ws')->debug('tcp onConnect', [
'connection_id' => $connection->id,
'remote' => method_exists($connection, 'getRemoteIp') ? $connection->getRemoteIp() : '',
]);
}
/** /**
* 从 Redis 队列拉取事件并推送给已订阅连接。 * Workerman 在 WebSocket 握手时回调;$request 可能是 stringHTTP raw或 Webman\Http\Request 对象,
* 部分环境下 WebSocket 进程的 onWorkerStart 可能未触发,因此在首帧握手处也会兜底启动一次(全局仅注册一个 Timer * 这里仅尝试从 connection 内置变量取 QueryString 做最大兼容
*/
public function onWebSocketConnect(TcpConnection $connection, mixed $request = null): void
{
self::ensureEventBusConsumer();
self::ensureAdminLiveSnapshotTicker();
self::ensureHeartbeatChecker();
$queryString = self::extractQueryString($connection, $request);
$query = GameWebSocketAuthHelper::parseQueryString($queryString);
$auth = GameWebSocketAuthHelper::authorize($query);
$remoteIp = self::remoteIp($connection);
if (!$auth['ok']) {
Log::channel('ws')->warning('handshake denied', [
'connection_id' => $connection->id,
'remote_ip' => $remoteIp,
'reason' => $auth['reason'],
'query_keys' => array_keys($query),
]);
GameWebSocketDispatcher::sendDirect($connection, 'ws.error', [
'code' => 1101,
'message' => 'Authentication failed: ' . ($auth['reason'] ?: 'unauthorized'),
], 'handshake_denied');
try {
$connection->close();
} catch (Throwable) {
}
return;
}
$connection->wsAuth = $auth;
self::$connectionAuth[$connection->id] = $auth;
GameWebSocketSubscriptionRegistry::registerConnection($connection->id, (int) $auth['user_id'], $remoteIp);
Log::channel('ws')->info('handshake ok', [
'connection_id' => $connection->id,
'remote_ip' => $remoteIp,
'mode' => $auth['mode'],
'user_id' => $auth['user_id'],
'admin_id' => $auth['admin_id'],
]);
GameWebSocketDispatcher::sendDirect($connection, 'ws.connected', [
'message' => 'WebSocket connected',
'connection_id' => $connection->id,
'mode' => $auth['mode'],
'user_id' => $auth['user_id'],
'server_time' => time(),
'heartbeat_interval' => 30,
'idle_timeout' => self::HEARTBEAT_IDLE_SECONDS,
], 'handshake_ok');
}
public function onMessage(TcpConnection $connection, string $payload): void
{
GameWebSocketSubscriptionRegistry::touch($connection->id);
$decoded = json_decode($payload, true);
if (!is_array($decoded)) {
Log::channel('ws')->debug('invalid json payload', [
'connection_id' => $connection->id,
'payload_size' => strlen($payload),
]);
GameWebSocketDispatcher::sendDirect($connection, 'ws.error', [
'message' => 'Invalid JSON payload',
], 'invalid_json');
return;
}
$action = isset($decoded['action']) && is_string($decoded['action']) ? trim($decoded['action']) : '';
if ($action === 'ping') {
GameWebSocketDispatcher::sendDirect($connection, 'pong', [
'server_time' => date('Y-m-d H:i:s'),
'server_time_ts' => time(),
], 'pong');
return;
}
if ($action === 'subscribe') {
$rawTopics = $decoded['topics'] ?? [];
$rawList = is_array($rawTopics) ? $rawTopics : [];
$finalTopics = GameWebSocketSubscriptionRegistry::replaceSubscriptions($connection->id, $rawList);
Log::channel('ws')->info('subscribe', [
'connection_id' => $connection->id,
'user_id' => GameWebSocketSubscriptionRegistry::userIdOf($connection->id),
'topics' => $finalTopics,
'requested_count' => is_array($rawTopics) ? count($rawTopics) : 0,
]);
GameWebSocketDispatcher::sendDirect($connection, 'ws.subscribed', [
'topics' => $finalTopics,
], 'subscribed');
self::pushAdminTestOddsPreview($connection, $finalTopics);
return;
}
Log::channel('ws')->debug('unsupported action', [
'connection_id' => $connection->id,
'action' => $action,
]);
GameWebSocketDispatcher::sendDirect($connection, 'ws.error', [
'message' => 'Unsupported action',
'received_action' => $action,
], 'unsupported_action');
}
public function onError(TcpConnection $connection, int $code, string $msg): void
{
Log::channel('ws')->warning('connection error', [
'connection_id' => $connection->id,
'code' => $code,
'detail' => $msg,
]);
try {
$connection->send(json_encode([
'event' => 'ws.error',
'message' => 'Server internal error',
'code' => $code,
'detail' => $msg,
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
} catch (Throwable) {
}
}
public function onClose(TcpConnection $connection): void
{
$auth = self::$connectionAuth[$connection->id] ?? [];
Log::channel('ws')->info('connection close', [
'connection_id' => $connection->id,
'user_id' => GameWebSocketSubscriptionRegistry::userIdOf($connection->id),
'mode' => $auth['mode'] ?? '',
'remaining_connections' => max(0, GameWebSocketSubscriptionRegistry::connectionCount() - 1),
]);
GameWebSocketSubscriptionRegistry::unregisterConnection($connection->id);
unset(self::$connections[$connection->id], self::$connectionAuth[$connection->id]);
}
// ============================================================
// 内部Timer / 兜底
// ============================================================
/**
* 每秒拉取 Redis 队列事件并分发到对应订阅连接。
* popBatch 内部已捕获 Redis 异常并写 ws 日志Timer 不会因此停跑。
*/ */
private static function ensureEventBusConsumer(): void private static function ensureEventBusConsumer(): void
{ {
@@ -31,41 +233,36 @@ class GameWebSocketServer
return; return;
} }
self::$eventBusConsumerStarted = true; self::$eventBusConsumerStarted = true;
Timer::add(1, static function (): void { Timer::add(self::QUEUE_TICK_INTERVAL, static function (): void {
$events = GameWebSocketEventBus::popBatch(); try {
if ($events === []) { $events = GameWebSocketEventBus::popBatch();
return; if ($events === []) {
} return;
foreach ($events as $event) {
$topic = $event['topic'] ?? '';
if (!is_string($topic) || $topic === '') {
continue;
} }
$eventName = $event['event'] ?? $topic; foreach ($events as $event) {
$data = $event['data'] ?? []; try {
if (!is_array($data)) { GameWebSocketDispatcher::dispatch($event, self::$connections);
$data = []; } catch (Throwable $e) {
} Log::channel('ws')->error('dispatch loop exception', [
$serverTime = $event['server_time'] ?? time(); 'topic' => $event['topic'] ?? '',
foreach (self::$connections as $connection) { 'error' => $e->getMessage(),
$topics = $connection->topics ?? []; ]);
if (!is_array($topics) || !in_array($topic, $topics, true)) {
continue;
} }
$connection->send(json_encode([
'event' => $eventName,
'topic' => $topic,
'data' => $data,
'server_time' => $serverTime,
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
} }
} catch (Throwable $e) {
Log::channel('ws')->error('event bus consumer tick exception', [
'error' => $e->getMessage(),
]);
} }
}); });
Log::channel('ws')->info('event bus consumer started', [
'tick_interval_seconds' => self::QUEUE_TICK_INTERVAL,
]);
} }
/** /**
* 兜底直推:admin.live.snapshot 每秒主动构建并广播 * admin.live.snapshot 兜底:每秒构建一次快照直接推送(不依赖 Redis 队列)
* 目的:即使 Redis 队列不可用,也能保证 /admin/game/live 实时看到对局变化 * 用于 /admin/game/live 实时对局页,确保 Redis 异常时后台仍能看到对局状态
*/ */
private static function ensureAdminLiveSnapshotTicker(): void private static function ensureAdminLiveSnapshotTicker(): void
{ {
@@ -74,18 +271,18 @@ class GameWebSocketServer
} }
self::$adminSnapshotTickerStarted = true; self::$adminSnapshotTickerStarted = true;
Timer::add(1, static function (): void { Timer::add(1, static function (): void {
$hasAdminSubscriber = false; $cids = GameWebSocketSubscriptionRegistry::connectionsForTopic('admin.live.snapshot');
foreach (self::$connections as $connection) { if ($cids === []) {
$topics = $connection->topics ?? []; return;
if (is_array($topics) && in_array('admin.live.snapshot', $topics, true)) { }
$hasAdminSubscriber = true; try {
break; $snapshot = GameLiveService::buildSnapshot(null);
} } catch (Throwable $e) {
} Log::channel('ws')->error('admin snapshot build failed', [
if (!$hasAdminSubscriber) { 'error' => $e->getMessage(),
]);
return; return;
} }
$snapshot = GameLiveService::buildSnapshot(null);
$payload = json_encode([ $payload = json_encode([
'event' => 'admin.live.snapshot', 'event' => 'admin.live.snapshot',
'topic' => 'admin.live.snapshot', 'topic' => 'admin.live.snapshot',
@@ -95,106 +292,60 @@ class GameWebSocketServer
if (!is_string($payload) || $payload === '') { if (!is_string($payload) || $payload === '') {
return; return;
} }
foreach (self::$connections as $connection) { $sent = 0;
$topics = $connection->topics ?? []; $failed = 0;
if (!is_array($topics) || !in_array('admin.live.snapshot', $topics, true)) { foreach ($cids as $cid) {
if (!isset(self::$connections[$cid])) {
continue; continue;
} }
$connection->send($payload); try {
self::$connections[$cid]->send($payload);
$sent++;
} catch (Throwable $e) {
$failed++;
Log::channel('ws')->warning('admin snapshot send failed', [
'connection_id' => $cid,
'error' => $e->getMessage(),
]);
}
} }
}); });
} }
public function onWorkerStart(): void /**
* 心跳超时检查器:每 10s 扫描一次,关闭 60s 无上行报文的连接。
* 触发客户端重连,避免半关闭僵尸连接持续接收推送但不实际送达。
*/
private static function ensureHeartbeatChecker(): void
{ {
self::ensureEventBusConsumer(); if (self::$heartbeatCheckerStarted) {
self::ensureAdminLiveSnapshotTicker();
}
public function onConnect(TcpConnection $connection): void
{
$connection->topics = [];
self::$connections[$connection->id] = $connection;
}
public function onWebSocketConnect(TcpConnection $connection): void
{
self::ensureEventBusConsumer();
self::ensureAdminLiveSnapshotTicker();
$connection->send(json_encode([
'event' => 'ws.connected',
'message' => 'WebSocket connected',
'connection_id' => $connection->id,
'server_time' => time(),
'heartbeat_interval' => 30,
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
}
public function onMessage(TcpConnection $connection, string $payload): void
{
$decoded = json_decode($payload, true);
if (!is_array($decoded)) {
$connection->send(json_encode([
'event' => 'ws.error',
'message' => 'Invalid JSON payload',
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
return; return;
} }
self::$heartbeatCheckerStarted = true;
$action = $decoded['action'] ?? ''; Timer::add(self::HEARTBEAT_CHECK_INTERVAL, static function (): void {
if ($action === 'ping') { $cutoff = time() - self::HEARTBEAT_IDLE_SECONDS;
$connection->send(json_encode([ $stale = GameWebSocketSubscriptionRegistry::staleConnections($cutoff);
'event' => 'pong', if ($stale === []) {
'server_time' => date('Y-m-d H:i:s'), return;
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); }
return; foreach ($stale as $cid) {
} Log::channel('ws')->info('close idle connection', [
'connection_id' => $cid,
if ($action === 'subscribe') { 'user_id' => GameWebSocketSubscriptionRegistry::userIdOf($cid),
$topics = $decoded['topics'] ?? []; 'idle_seconds' => self::HEARTBEAT_IDLE_SECONDS,
$sanitized = []; ]);
if (is_array($topics)) { if (isset(self::$connections[$cid])) {
foreach ($topics as $topic) { try {
if (!is_string($topic)) { self::$connections[$cid]->close();
continue; } catch (Throwable) {
} }
$value = trim($topic);
if ($value === '') {
continue;
}
$sanitized[] = $value;
} }
} }
$connection->topics = array_values(array_unique($sanitized)); });
$connection->send(json_encode([ Log::channel('ws')->info('heartbeat checker started', [
'event' => 'ws.subscribed', 'idle_seconds' => self::HEARTBEAT_IDLE_SECONDS,
'topics' => $connection->topics, 'check_interval' => self::HEARTBEAT_CHECK_INTERVAL,
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); ]);
self::pushAdminTestOddsPreview($connection, $connection->topics);
return;
}
$connection->send(json_encode([
'event' => 'ws.error',
'message' => 'Unsupported action',
'received_action' => $action,
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
}
public function onError(TcpConnection $connection, int $code, string $msg): void
{
$connection->send(json_encode([
'event' => 'ws.error',
'message' => 'Server internal error',
'code' => $code,
'detail' => $msg,
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
}
public function onClose(TcpConnection $connection): void
{
$connection->topics = [];
unset(self::$connections[$connection->id]);
} }
/** /**
@@ -210,12 +361,59 @@ class GameWebSocketServer
} }
$serverTime = time(); $serverTime = time();
foreach ($frames as $frame) { foreach ($frames as $frame) {
$connection->send(json_encode([ GameWebSocketDispatcher::sendDirect($connection, $frame['event'], [
'event' => $frame['event'],
'topic' => $frame['topic'], 'topic' => $frame['topic'],
'data' => $frame['data'], 'data' => $frame['data'],
'server_time' => $serverTime, 'server_time' => $serverTime,
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES)); ], 'admin_test_preview');
} }
} }
/**
* 兼容多种 Workerman 版本:尝试取握手 URI 的 query 部分。
* Workerman v5 中 onWebSocketConnect 第二参数已是 array $headers/$request这里尽量兼容字符串与对象。
*/
private static function extractQueryString(TcpConnection $connection, mixed $request): string
{
if (isset($connection->wsHandshakeQuery) && is_string($connection->wsHandshakeQuery)) {
return (string) $connection->wsHandshakeQuery;
}
if (isset($connection->onWebSocketConnect)) {
// noop
}
$serverUri = '';
if (isset($_SERVER['REQUEST_URI']) && is_string($_SERVER['REQUEST_URI'])) {
$serverUri = $_SERVER['REQUEST_URI'];
}
if ($serverUri === '' && isset($connection->headers) && is_array($connection->headers)) {
$line = $connection->headers['get'] ?? $connection->headers['GET'] ?? '';
if (is_string($line)) {
$serverUri = $line;
}
}
if ($serverUri === '' && is_string($request)) {
if (preg_match('#^GET\s+([^\s]+)#i', $request, $m) === 1) {
$serverUri = $m[1];
}
}
if ($serverUri === '' && is_object($request) && method_exists($request, 'queryString')) {
$serverUri = '?' . (string) $request->queryString();
}
$qPos = strpos($serverUri, '?');
if ($qPos === false) {
return '';
}
return substr($serverUri, $qPos + 1);
}
private static function remoteIp(TcpConnection $connection): string
{
try {
if (method_exists($connection, 'getRemoteIp')) {
return (string) $connection->getRemoteIp();
}
} catch (Throwable) {
}
return '';
}
} }

View File

@@ -29,4 +29,26 @@ return [
] ]
], ],
], ],
/**
* 游戏 WebSocket 推送链路专用日志(独立通道):
* - 写入 runtime/logs/ws-YYYY-MM-DD.log保留 7 天
* - 通过 Log::channel('ws')->info(...) 访问
* - 记录维度publish 入队、queue 消费、按 topic/user_id 分发、心跳与连接生命周期、握手鉴权失败等
*/
'ws' => [
'handlers' => [
[
'class' => Monolog\Handler\RotatingFileHandler::class,
'constructor' => [
runtime_path() . '/logs/ws.log',
7,
Monolog\Logger::DEBUG,
],
'formatter' => [
'class' => Monolog\Formatter\LineFormatter::class,
'constructor' => [null, 'Y-m-d H:i:s', true],
],
],
],
],
]; ];

View File

@@ -764,22 +764,35 @@
- **移动端配置缺口****`POST /api/game/lobbyInit` 当前不下发 WebSocket 地址**H5 需与运维约定同一套 `H5_WEBSOCKET_URL`(打包进前端配置、远程配置中心等),与 HTTP API 基址可不同域。 - **移动端配置缺口****`POST /api/game/lobbyInit` 当前不下发 WebSocket 地址**H5 需与运维约定同一套 `H5_WEBSOCKET_URL`(打包进前端配置、远程配置中心等),与 HTTP API 基址可不同域。
- **混合内容**:若 H5 页面为 **HTTPS**,浏览器要求 WebSocket 使用 **`wss://`**,否则会被拦截。 - **混合内容**:若 H5 页面为 **HTTPS**,浏览器要求 WebSocket 使用 **`wss://`**,否则会被拦截。
- **事件投递依赖 Redis**HTTP 侧业务通过 **`GameWebSocketEventBus`**Redis 列表)将事件投递到 WebSocket 进程Redis 不可用或队列异常时,**除 `admin.live.snapshot` 外**的广播类推送可能收不到。后台若订阅了 `admin.live.snapshot`,服务端有**每秒直连构建快照**的兜底,不依赖队列。 - **事件投递依赖 Redis**HTTP 侧业务通过 **`GameWebSocketEventBus`**Redis 列表)将事件投递到 WebSocket 进程Redis 不可用或队列异常时,**除 `admin.live.snapshot` 外**的广播类推送可能收不到。后台若订阅了 `admin.live.snapshot`,服务端有**每秒直连构建快照**的兜底,不依赖队列。
- **鉴权(重要)****当前 `GameWebSocketServer` 在握手阶段不校验** URL Query 中的 `token` / `auth_token` / `user-token` 等;**任何人拿到地址即可建立连接**(与 §1 HTTP 接口必须 `auth-token` + `user-token` 不同。Query 中上述参数为**预留/习惯写法**,便于后续若要在 `onWebSocketConnect` 中实现鉴权再与 HTTP 对齐;**文档中若写「未登录返回 1101」属规划口径非现网行为**。 - **握手鉴权(2026-05 重构后强制)**`GameWebSocketServer::onWebSocketConnect` 通过 `GameWebSocketAuthHelper::authorize` 校验 URL Query。两种合法身份
- **mobileH5/移动端)**:必须同时携带 `auth_token`(同 HTTP `auth-token`+ `user_token`(同 HTTP `user-token`,亦支持 `token` 同义)。校验通过后连接被绑定 `user_id`,分发器仅向其推送本人的 user 级主题。
- **admin后台/运维)**:必须携带 `admin_ws_token`(由后台 `wsConfig` 接口签发,写入 Redis Key `dfw:v1:ws:admin_token:{token}`,默认 TTL 7200s。后台已 `wsConfig` 中把该 token 拼到 `ws_url` 一并返回前端透传即可admin 模式 `user_id=0`,可订阅任意主题并收到**全量** user 级推送(运维联调用)。
- 任一身份不通过 → 服务端发送 `{"event":"ws.error","code":1101,"message":"Authentication failed: ..."}` 并立即 `close`
- **服务端按 user_id 过滤user 级主题)**:以下 topic 的 `data.user_id` 必须 **等于** 当前连接绑定的 `user_id` 才会下发——**`bet.win` / `user.streak` / `wallet.changed` / `bet.accepted` / `auto.spin.progress`**。其它 topic`period.tick` / `period.opened` / `jackpot.hit` / `admin.*`按订阅广播。admin 模式不参与此过滤。
- **心跳超时(服务端主动)**:连接 60s 内无任何上行报文(含 `ping`/`subscribe`)即被 server 主动 `close`,触发客户端走重连流程;避免半关闭的僵尸连接长期持有订阅却不能实际送达推送。
- **独立日志通道 `ws`**`runtime/logs/ws.log`(保留 7 天)。记录维度包含 `publish 入队 / popBatch 异常 / dispatchtopic/candidates/matched/skipped_not_owner/skipped_closed/send_failed/ handshake_ok | denied / subscribe / pong / close idle / send failed` 等。排查"为什么没收到推送"时优先看此文件。
- **订阅才有业务推送**:建连后仅会收到握手首帧(见下)及本连接已订阅主题的消息;不发送 `subscribe` 则收不到 `period.tick` 等(`admin.live.snapshot` 同上,需显式订阅)。 - **订阅才有业务推送**:建连后仅会收到握手首帧(见下)及本连接已订阅主题的消息;不发送 `subscribe` 则收不到 `period.tick` 等(`admin.live.snapshot` 同上,需显式订阅)。
### 7.1 WebSocket 连接与消息 ### 7.1 WebSocket 连接与消息
- **连接地址**:见 **§7.0**(环境变量 `H5_WEBSOCKET_URL` 或后台 `wsConfig` 返回的 `ws_url` - **连接地址**:见 **§7.0**(环境变量 `H5_WEBSOCKET_URL` 或后台 `wsConfig` 返回的 `ws_url`
- **客户端**:浏览器原生 `WebSocket``ws://` / `wss://` - **客户端**:浏览器原生 `WebSocket``ws://` / `wss://`
- **连接时携带参数(可选 / 预留** - **连接时必带 Query 参数2026-05 起强制**
- URL Query 可带 `token`(与 HTTP 头 `user-token` 同义习惯)、`auth_token`(与 HTTP 头 `auth-token` 同义习惯)、`device_id``lang` 等,**当前服务端不解析、不校验**;若后续版本实现握手鉴权,以发布说明为准 - **H5/移动端**`auth_token=<HTTP auth-token>` + `user_token=<HTTP user-token>`(亦支持 `token` 同义)。`device_id``lang` 仍可携带,但服务端不强制
- 示例(习惯写法):`wss://ws.example.com/ws?token=xxx&auth_token=xxx&device_id=ios_001&lang=zh` - **后台**`admin_ws_token=<wsConfig 返回的 admin_ws_token>`(后台 `wsConfig` 已直接把它拼到 `ws_url`,前端透传即可)。
- 示例:
- H5`wss://ws.example.com/ws/?auth_token=xxx&user_token=yyy&device_id=ios_001&lang=zh`
- 后台:`wss://ws.example.com/ws/?admin_ws_token=zzz`
- 缺失任一必填字段或 token 失效 → 服务端回 `{"event":"ws.error","code":1101,...}` 后立即关闭连接。
- **连接成功首帧(当前实现)** - **连接成功首帧(当前实现)**
- `event``ws.connected` - `event``ws.connected`
- `message`:固定文案 `WebSocket connected`(便于联调日志) - `message`:固定文案 `WebSocket connected`(便于联调日志)
- `connection_id`:连接唯一标识(进程内) - `connection_id`:连接唯一标识(进程内)
- `mode``mobile` | `admin`2026-05 新增;表明本连接的鉴权身份)
- `user_id`int2026-05 新增mobile 模式为真实玩家 idadmin 模式为 0
- `server_time`:服务器时间戳(**秒**int - `server_time`:服务器时间戳(**秒**int
- `heartbeat_interval`:建议心跳间隔(**秒**,当前实现固定为 `30` - `heartbeat_interval`:建议心跳间隔(**秒**,当前实现固定为 `30`
- `idle_timeout`:服务端主动关闭的空闲秒数(**秒**,当前实现固定为 `60`;客户端 `idle_timeout - 心跳间隔` 内必须发出 `ping`,否则会被 server 主动 `close`
- **连接后错误帧(当前实现,非 HTTP 业务码)** - **连接后错误帧(当前实现,非 HTTP 业务码)**
- JSON 无法解析:`event`=`ws.error``message`=`Invalid JSON payload`(无 `code` 或与 HTTP `code` 不同体系) - JSON 无法解析:`event`=`ws.error``message`=`Invalid JSON payload`(无 `code` 或与 HTTP `code` 不同体系)
- 未知 `action``event`=`ws.error``message`=`Unsupported action`,并可能带 `received_action` - 未知 `action``event`=`ws.error``message`=`Unsupported action`,并可能带 `received_action`
@@ -807,8 +820,10 @@
#### 7.1.2 订阅行为说明 #### 7.1.2 订阅行为说明
- **仅建立连接不会自动下发全部业务消息**;客户端需要发送 `subscribe` 明确订阅主题。 - **仅建立连接不会自动下发全部业务消息**;客户端需要发送 `subscribe` 明确订阅主题。
- 成功订阅后服务端返回:`{"event":"ws.subscribed","topics":[...]}` - 成功订阅后服务端返回:`{"event":"ws.subscribed","topics":[...]}`(已去重、按字典序排序,与提交顺序无关)
- **`subscribe` 覆盖式生效**:每次发送都会**完全替换**该连接的订阅集合(不是累加)。需要追加请把已有列表一并发上来。
- 若未订阅主题,通常只能收到握手首帧(`ws.connected`)和心跳回包(`pong`)。 - 若未订阅主题,通常只能收到握手首帧(`ws.connected`)和心跳回包(`pong`)。
- **服务端按 user_id 过滤**mobile 模式连接只会收到 `data.user_id == 自己 user_id` 的 user 级主题(见 §7.0 列表admin 模式不过滤,收到全量。**客户端仍应做一次防御性 `user_id` 过滤**,避免后续接口变更带来误处理。
- **不下发** `streak_win_reward` 全表110 档);赔率仅通过 `user.streak` / `wallet.changed` / `bet.accepted``lobbyInit.user_snapshot` 推送**当前登录玩家**本局适用字段。 - **不下发** `streak_win_reward` 全表110 档);赔率仅通过 `user.streak` / `wallet.changed` / `bet.accepted``lobbyInit.user_snapshot` 推送**当前登录玩家**本局适用字段。
#### 7.1.2A 连胜赔率与连胜场次WebSocket #### 7.1.2A 连胜赔率与连胜场次WebSocket

View File

@@ -0,0 +1,59 @@
<?php
declare(strict_types=1);
/**
* 本地烟雾测试:验证 GameWebSocketAuthHelper / Registry / Dispatcher 关键路径。
* 不依赖 webman 进程,可直接 `php scripts/smoke_ws_auth_helper.php` 跑。
*/
require __DIR__ . '/../vendor/autoload.php';
require __DIR__ . '/../support/bootstrap.php';
use app\common\service\GameWebSocketAuthHelper;
use app\common\service\GameWebSocketSubscriptionRegistry;
echo "[1] parseQueryString\n";
$q = GameWebSocketAuthHelper::parseQueryString('?auth_token=AAA&user_token=BBB&device_id=ios&extra=1');
echo json_encode($q, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT), PHP_EOL, PHP_EOL;
echo "[2] authorize: missing auth-token (mobile)\n";
$r = GameWebSocketAuthHelper::authorize(['user_token' => 'whatever']);
echo json_encode($r, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT), PHP_EOL, PHP_EOL;
echo "[3] authorize: invalid auth-token\n";
$r = GameWebSocketAuthHelper::authorize(['auth_token' => 'not_existing_token', 'user_token' => 'not_existing_user_token']);
echo json_encode($r, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT), PHP_EOL, PHP_EOL;
echo "[4] authorize: invalid admin_ws_token bypass\n";
$r = GameWebSocketAuthHelper::authorize(['admin_ws_token' => 'no_such_admin_token_xxxxxx']);
echo json_encode($r, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT), PHP_EOL, PHP_EOL;
echo "[5] issue + validate admin_ws_token\n";
$issued = GameWebSocketAuthHelper::issueAdminWsToken(99, 60);
echo 'issued: ', json_encode($issued, JSON_UNESCAPED_UNICODE), PHP_EOL;
if (($issued['token'] ?? '') !== '') {
$r = GameWebSocketAuthHelper::authorize(['admin_ws_token' => $issued['token']]);
echo 'authorize ok with issued token: ', json_encode($r, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT), PHP_EOL;
} else {
echo "WARN: issueAdminWsToken returned empty (Redis 不可用?)\n";
}
echo PHP_EOL;
echo "[6] SubscriptionRegistry\n";
GameWebSocketSubscriptionRegistry::reset();
GameWebSocketSubscriptionRegistry::registerConnection(10, 14, '127.0.0.1');
GameWebSocketSubscriptionRegistry::registerConnection(11, 0, '127.0.0.1');
$t1 = GameWebSocketSubscriptionRegistry::replaceSubscriptions(10, ['bet.win', 'period.opened', 'period.opened', '', ' ', 'bet.win']);
$t2 = GameWebSocketSubscriptionRegistry::replaceSubscriptions(11, ['admin.live.snapshot', 'period.opened']);
echo 'cid=10 topics=', json_encode($t1), PHP_EOL;
echo 'cid=11 topics=', json_encode($t2), PHP_EOL;
echo 'period.opened subscribers=', json_encode(GameWebSocketSubscriptionRegistry::connectionsForTopic('period.opened')), PHP_EOL;
echo 'bet.win subscribers=', json_encode(GameWebSocketSubscriptionRegistry::connectionsForTopic('bet.win')), PHP_EOL;
echo 'admin.live.snapshot subscribers=', json_encode(GameWebSocketSubscriptionRegistry::connectionsForTopic('admin.live.snapshot')), PHP_EOL;
GameWebSocketSubscriptionRegistry::unregisterConnection(10);
echo 'after unregister(10), bet.win subscribers=', json_encode(GameWebSocketSubscriptionRegistry::connectionsForTopic('bet.win')), PHP_EOL;
echo 'after unregister(10), period.opened subscribers=', json_encode(GameWebSocketSubscriptionRegistry::connectionsForTopic('period.opened')), PHP_EOL;
echo 'remaining connection count=', GameWebSocketSubscriptionRegistry::connectionCount(), PHP_EOL;
echo "\nALL DONE\n";

View File

@@ -463,7 +463,16 @@ function connectWs(): void {
socket.onclose = () => { socket.onclose = () => {
wsConnected.value = false wsConnected.value = false
wsClient.value = null wsClient.value = null
window.setTimeout(() => { // 断线后:先刷新 wsConfig 拿新的 admin_ws_token避免握手 token 已过期反复失败),再重连
window.setTimeout(async () => {
if (wsConnected.value) {
return
}
try {
await reloadWsConfig()
} catch {
/* ignore下次重连时再试 */
}
if (!wsConnected.value) { if (!wsConnected.value) {
connectWs() connectWs()
} }