470 lines
18 KiB
PHP
470 lines
18 KiB
PHP
<?php
|
||
|
||
declare(strict_types=1);
|
||
|
||
namespace app\process;
|
||
|
||
use app\common\service\GameLiveService;
|
||
use app\common\service\GameWebSocketAuthHelper;
|
||
use app\common\service\GameWebSocketDispatcher;
|
||
use app\common\service\GameWebSocketEventBus;
|
||
use app\common\service\GameWebSocketPayloadHelper;
|
||
use app\common\service\GameWebSocketSubscriptionRegistry;
|
||
use support\Log;
|
||
use Throwable;
|
||
use Workerman\Connection\TcpConnection;
|
||
use Workerman\Timer;
|
||
|
||
/**
|
||
* H5/后台 WebSocket 服务(重构版,2026-05)
|
||
*
|
||
* 设计与 docs/36字花-移动端接口设计草案.md §7 对齐:
|
||
*
|
||
* - 握手鉴权(GameWebSocketAuthHelper)
|
||
* - mobile:URL Query **`auth-token`** + **`user-token`**(与 HTTP 头一致),绑定 user_id
|
||
* - admin: URL Query **`admin-ws-token`**(后台 wsConfig 签发),user_id=0,
|
||
* user 级主题不过滤(运维/联调可观测全量)
|
||
* - 客户端 -> 服务端:`{"action":"ping"}` / `{"action":"subscribe","topics":[...]}`
|
||
* - 服务端 -> 客户端:`ws.connected` / `ws.subscribed` / `pong` / `ws.error` / 业务事件帧
|
||
* - 主动心跳超时:连接 60s 内无任何上行报文(含 ping)即被 server 主动 close,
|
||
* 触发客户端重连,避免半关闭僵尸连接堵在分发表里
|
||
* - 事件分发由 GameWebSocketDispatcher 负责;每次 publish/consume/dispatch/send/订阅/关闭
|
||
* 都写入独立日志通道 runtime/logs/ws.log
|
||
*
|
||
* 高可用:
|
||
* - 消费 Timer 的 popBatch 异常已被 EventBus 内部捕获并记录,下次 tick 自动恢复
|
||
* - 任一连接 send 异常不会影响其它连接的下发(Dispatcher 内单连接 try/catch)
|
||
* - admin.live.snapshot 兜底直推 Timer 与队列消费解耦,Redis 异常时后台仍能看到对局状态
|
||
*/
|
||
class GameWebSocketServer
|
||
{
|
||
/** @var array<int, TcpConnection> connection_id => TcpConnection(仅本进程内) */
|
||
private static array $connections = [];
|
||
|
||
/** @var array<int, array<string, mixed>> connection_id => 鉴权元数据(mode/admin_id 等,便于日志/排查) */
|
||
private static array $connectionAuth = [];
|
||
|
||
private static bool $eventBusConsumerStarted = false;
|
||
private static bool $adminSnapshotTickerStarted = false;
|
||
private static bool $heartbeatCheckerStarted = false;
|
||
/** @var bool */
|
||
private static bool $liveStateTickerStarted = false;
|
||
|
||
/** 客户端 60s 内无任何上行报文(含 ping)即被主动断开 */
|
||
private const HEARTBEAT_IDLE_SECONDS = 60;
|
||
|
||
/** 事件队列每 N ms 拉一次(默认 1s;与文档 §7.1.3 一致) */
|
||
private const QUEUE_TICK_INTERVAL = 1;
|
||
|
||
/** 心跳超时检查间隔(秒) */
|
||
private const HEARTBEAT_CHECK_INTERVAL = 10;
|
||
|
||
public function onWorkerStart(): void
|
||
{
|
||
Log::channel('ws')->info('ws worker start', [
|
||
'pid' => function_exists('posix_getpid') ? posix_getpid() : getmypid(),
|
||
]);
|
||
self::ensureEventBusConsumer();
|
||
self::ensureAdminLiveSnapshotTicker();
|
||
self::ensureHeartbeatChecker();
|
||
}
|
||
|
||
public function onConnect(TcpConnection $connection): void
|
||
{
|
||
$connection->topics = [];
|
||
$connection->wsAuth = null;
|
||
self::$connections[$connection->id] = $connection;
|
||
Log::channel('ws')->debug('tcp onConnect', [
|
||
'connection_id' => $connection->id,
|
||
'remote' => method_exists($connection, 'getRemoteIp') ? $connection->getRemoteIp() : '',
|
||
]);
|
||
}
|
||
|
||
/**
|
||
* Workerman 在 WebSocket 握手时回调;$request 可能是 string(HTTP raw)或 Webman\Http\Request 对象,
|
||
* 这里仅尝试从 connection 内置变量取 QueryString 做最大兼容。
|
||
*/
|
||
public function onWebSocketConnect(TcpConnection $connection, mixed $request = null): void
|
||
{
|
||
self::ensureEventBusConsumer();
|
||
self::ensureAdminLiveSnapshotTicker();
|
||
self::ensureHeartbeatChecker();
|
||
self::ensureLiveStateTicker();
|
||
|
||
$queryString = self::extractQueryString($connection, $request);
|
||
$query = GameWebSocketAuthHelper::parseQueryString($queryString);
|
||
|
||
$auth = GameWebSocketAuthHelper::authorize($query);
|
||
$remoteIp = self::remoteIp($connection);
|
||
|
||
if (!$auth['ok']) {
|
||
Log::channel('ws')->warning('handshake denied', [
|
||
'connection_id' => $connection->id,
|
||
'remote_ip' => $remoteIp,
|
||
'reason' => $auth['reason'],
|
||
'query_keys' => array_keys($query),
|
||
]);
|
||
GameWebSocketDispatcher::sendDirect($connection, 'ws.error', [
|
||
'code' => 1101,
|
||
'message' => 'Authentication failed: ' . ($auth['reason'] ?: 'unauthorized'),
|
||
], 'handshake_denied');
|
||
try {
|
||
$connection->close();
|
||
} catch (Throwable) {
|
||
}
|
||
return;
|
||
}
|
||
|
||
$connection->wsAuth = $auth;
|
||
self::$connectionAuth[$connection->id] = $auth;
|
||
GameWebSocketSubscriptionRegistry::registerConnection(
|
||
$connection->id,
|
||
(int) $auth['user_id'],
|
||
is_string($auth['mode'] ?? null) ? (string) $auth['mode'] : 'mobile',
|
||
$remoteIp
|
||
);
|
||
|
||
Log::channel('ws')->info('handshake ok', [
|
||
'connection_id' => $connection->id,
|
||
'remote_ip' => $remoteIp,
|
||
'mode' => $auth['mode'],
|
||
'user_id' => $auth['user_id'],
|
||
'admin_id' => $auth['admin_id'],
|
||
]);
|
||
|
||
GameWebSocketDispatcher::sendDirect($connection, 'ws.connected', [
|
||
'message' => 'WebSocket connected',
|
||
'mode' => $auth['mode'],
|
||
'server_time' => time(),
|
||
'heartbeat_interval' => 30,
|
||
'idle_timeout' => self::HEARTBEAT_IDLE_SECONDS,
|
||
], 'handshake_ok');
|
||
}
|
||
|
||
public function onMessage(TcpConnection $connection, string $payload): void
|
||
{
|
||
GameWebSocketSubscriptionRegistry::touch($connection->id);
|
||
|
||
$decoded = json_decode($payload, true);
|
||
if (!is_array($decoded)) {
|
||
Log::channel('ws')->debug('invalid json payload', [
|
||
'connection_id' => $connection->id,
|
||
'payload_size' => strlen($payload),
|
||
]);
|
||
GameWebSocketDispatcher::sendDirect($connection, 'ws.error', [
|
||
'message' => 'Invalid JSON payload',
|
||
], 'invalid_json');
|
||
return;
|
||
}
|
||
|
||
$action = isset($decoded['action']) && is_string($decoded['action']) ? trim($decoded['action']) : '';
|
||
|
||
if ($action === 'ping') {
|
||
GameWebSocketDispatcher::sendDirect($connection, 'pong', [
|
||
'server_time' => date('Y-m-d H:i:s'),
|
||
'server_time_ts' => time(),
|
||
], 'pong');
|
||
return;
|
||
}
|
||
|
||
if ($action === 'subscribe') {
|
||
$rawTopics = $decoded['topics'] ?? [];
|
||
$rawList = is_array($rawTopics) ? $rawTopics : [];
|
||
$replace = false;
|
||
if (!empty($decoded['replace'])) {
|
||
$replace = true;
|
||
}
|
||
if (isset($decoded['mode']) && is_string($decoded['mode']) && trim($decoded['mode']) === 'replace') {
|
||
$replace = true;
|
||
}
|
||
$before = GameWebSocketSubscriptionRegistry::meta($connection->id);
|
||
$beforeTopics = is_array($before) ? ($before['topics'] ?? []) : [];
|
||
|
||
$finalTopics = $replace
|
||
? GameWebSocketSubscriptionRegistry::replaceSubscriptions($connection->id, $rawList)
|
||
: GameWebSocketSubscriptionRegistry::mergeSubscriptions($connection->id, $rawList);
|
||
Log::channel('ws')->info('subscribe', [
|
||
'connection_id' => $connection->id,
|
||
'user_id' => GameWebSocketSubscriptionRegistry::userIdOf($connection->id),
|
||
'replace' => $replace,
|
||
'before_topics' => $beforeTopics,
|
||
'topics' => $finalTopics,
|
||
'requested_count' => is_array($rawTopics) ? count($rawTopics) : 0,
|
||
]);
|
||
GameWebSocketDispatcher::sendDirect($connection, 'ws.subscribed', [
|
||
'topics' => $finalTopics,
|
||
], 'subscribed');
|
||
// 演示帧仅供后台 admin 联调;mobile 用户订阅后不应收到样例玩家的 bet.accepted / wallet.changed
|
||
if (GameWebSocketSubscriptionRegistry::isAdmin($connection->id)) {
|
||
self::pushAdminTestOddsPreview($connection, $finalTopics);
|
||
}
|
||
return;
|
||
}
|
||
|
||
Log::channel('ws')->debug('unsupported action', [
|
||
'connection_id' => $connection->id,
|
||
'action' => $action,
|
||
]);
|
||
GameWebSocketDispatcher::sendDirect($connection, 'ws.error', [
|
||
'message' => 'Unsupported action',
|
||
'received_action' => $action,
|
||
], 'unsupported_action');
|
||
}
|
||
|
||
public function onError(TcpConnection $connection, int $code, string $msg): void
|
||
{
|
||
Log::channel('ws')->warning('connection error', [
|
||
'connection_id' => $connection->id,
|
||
'code' => $code,
|
||
'detail' => $msg,
|
||
]);
|
||
try {
|
||
$connection->send(json_encode([
|
||
'event' => 'ws.error',
|
||
'message' => 'Server internal error',
|
||
'code' => $code,
|
||
'detail' => $msg,
|
||
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
|
||
} catch (Throwable) {
|
||
}
|
||
}
|
||
|
||
public function onClose(TcpConnection $connection): void
|
||
{
|
||
$auth = self::$connectionAuth[$connection->id] ?? [];
|
||
Log::channel('ws')->info('connection close', [
|
||
'connection_id' => $connection->id,
|
||
'user_id' => GameWebSocketSubscriptionRegistry::userIdOf($connection->id),
|
||
'mode' => $auth['mode'] ?? '',
|
||
'remaining_connections' => max(0, GameWebSocketSubscriptionRegistry::connectionCount() - 1),
|
||
]);
|
||
GameWebSocketSubscriptionRegistry::unregisterConnection($connection->id);
|
||
unset(self::$connections[$connection->id], self::$connectionAuth[$connection->id]);
|
||
}
|
||
|
||
// ============================================================
|
||
// 内部:Timer / 兜底
|
||
// ============================================================
|
||
|
||
/**
|
||
* 每秒拉取 Redis 队列事件并分发到对应订阅连接。
|
||
* popBatch 内部已捕获 Redis 异常并写 ws 日志,Timer 不会因此停跑。
|
||
*/
|
||
private static function ensureEventBusConsumer(): void
|
||
{
|
||
if (self::$eventBusConsumerStarted) {
|
||
return;
|
||
}
|
||
self::$eventBusConsumerStarted = true;
|
||
Timer::add(self::QUEUE_TICK_INTERVAL, static function (): void {
|
||
try {
|
||
$events = GameWebSocketEventBus::popBatch();
|
||
if ($events === []) {
|
||
return;
|
||
}
|
||
foreach ($events as $event) {
|
||
try {
|
||
GameWebSocketDispatcher::dispatch($event, self::$connections);
|
||
} catch (Throwable $e) {
|
||
Log::channel('ws')->error('dispatch loop exception', [
|
||
'topic' => $event['topic'] ?? '',
|
||
'error' => $e->getMessage(),
|
||
]);
|
||
}
|
||
}
|
||
} catch (Throwable $e) {
|
||
Log::channel('ws')->error('event bus consumer tick exception', [
|
||
'error' => $e->getMessage(),
|
||
]);
|
||
}
|
||
});
|
||
Log::channel('ws')->info('event bus consumer started', [
|
||
'tick_interval_seconds' => self::QUEUE_TICK_INTERVAL,
|
||
]);
|
||
}
|
||
|
||
/**
|
||
* admin.live.snapshot 兜底:每秒构建一次快照直接推送(不依赖 Redis 队列)。
|
||
* 用于 /admin/game/live 实时对局页,确保 Redis 异常时后台仍能看到对局状态。
|
||
*/
|
||
private static function ensureAdminLiveSnapshotTicker(): void
|
||
{
|
||
if (self::$adminSnapshotTickerStarted) {
|
||
return;
|
||
}
|
||
self::$adminSnapshotTickerStarted = true;
|
||
Timer::add(1, static function (): void {
|
||
$cids = GameWebSocketSubscriptionRegistry::connectionsForTopic('admin.live.snapshot');
|
||
if ($cids === []) {
|
||
return;
|
||
}
|
||
try {
|
||
$snapshot = GameLiveService::buildSnapshot(null);
|
||
} catch (Throwable $e) {
|
||
Log::channel('ws')->error('admin snapshot build failed', [
|
||
'error' => $e->getMessage(),
|
||
]);
|
||
return;
|
||
}
|
||
$payload = json_encode([
|
||
'event' => 'admin.live.snapshot',
|
||
'topic' => 'admin.live.snapshot',
|
||
'data' => GameWebSocketPayloadHelper::sanitizeOutboundData($snapshot),
|
||
'server_time' => time(),
|
||
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
|
||
if (!is_string($payload) || $payload === '') {
|
||
return;
|
||
}
|
||
$sent = 0;
|
||
$failed = 0;
|
||
foreach ($cids as $cid) {
|
||
if (!isset(self::$connections[$cid])) {
|
||
continue;
|
||
}
|
||
try {
|
||
self::$connections[$cid]->send($payload);
|
||
$sent++;
|
||
} catch (Throwable $e) {
|
||
$failed++;
|
||
Log::channel('ws')->warning('admin snapshot send failed', [
|
||
'connection_id' => $cid,
|
||
'error' => $e->getMessage(),
|
||
]);
|
||
}
|
||
}
|
||
});
|
||
}
|
||
|
||
/**
|
||
* 心跳超时检查器:每 10s 扫描一次,关闭 60s 无上行报文的连接。
|
||
* 触发客户端重连,避免半关闭僵尸连接持续接收推送但不实际送达。
|
||
*/
|
||
private static function ensureHeartbeatChecker(): void
|
||
{
|
||
if (self::$heartbeatCheckerStarted) {
|
||
return;
|
||
}
|
||
self::$heartbeatCheckerStarted = true;
|
||
Timer::add(self::HEARTBEAT_CHECK_INTERVAL, static function (): void {
|
||
$cutoff = time() - self::HEARTBEAT_IDLE_SECONDS;
|
||
$stale = GameWebSocketSubscriptionRegistry::staleConnections($cutoff);
|
||
if ($stale === []) {
|
||
return;
|
||
}
|
||
foreach ($stale as $cid) {
|
||
Log::channel('ws')->info('close idle connection', [
|
||
'connection_id' => $cid,
|
||
'user_id' => GameWebSocketSubscriptionRegistry::userIdOf($cid),
|
||
'idle_seconds' => self::HEARTBEAT_IDLE_SECONDS,
|
||
]);
|
||
if (isset(self::$connections[$cid])) {
|
||
try {
|
||
self::$connections[$cid]->close();
|
||
} catch (Throwable) {
|
||
}
|
||
}
|
||
}
|
||
});
|
||
Log::channel('ws')->info('heartbeat checker started', [
|
||
'idle_seconds' => self::HEARTBEAT_IDLE_SECONDS,
|
||
'check_interval' => self::HEARTBEAT_CHECK_INTERVAL,
|
||
]);
|
||
}
|
||
|
||
/**
|
||
* 兜底:当 gameLiveTicker 未运行/卡死时,WS 进程也能驱动对局推进,避免对局长期卡在派彩中(status=3)。
|
||
*
|
||
* - 每秒执行一次 recoverLiveRoundState()(内部:finalizePayoutGrace + tickAutoDraw)
|
||
* - 幂等且有锁保护,和 gameLiveTicker 并存也不会破坏一致性
|
||
*/
|
||
private static function ensureLiveStateTicker(): void
|
||
{
|
||
if (self::$liveStateTickerStarted) {
|
||
return;
|
||
}
|
||
self::$liveStateTickerStarted = true;
|
||
|
||
Timer::add(1, static function (): void {
|
||
try {
|
||
GameLiveService::recoverLiveRoundState();
|
||
} catch (Throwable $e) {
|
||
Log::channel('ws')->error('live state ticker failed', [
|
||
'error' => $e->getMessage(),
|
||
]);
|
||
}
|
||
});
|
||
Log::channel('ws')->info('live state ticker started', [
|
||
'tick_interval_seconds' => 1,
|
||
]);
|
||
}
|
||
|
||
/**
|
||
* 后台联调:订阅赔率相关主题后立即推送演示帧(库内样例玩家,带 is_test / preview)。
|
||
*
|
||
* @param list<string> $topics
|
||
*/
|
||
private static function pushAdminTestOddsPreview(TcpConnection $connection, array $topics): void
|
||
{
|
||
$frames = GameWebSocketPayloadHelper::adminTestPushFrames($topics);
|
||
if ($frames === []) {
|
||
return;
|
||
}
|
||
$serverTime = time();
|
||
foreach ($frames as $frame) {
|
||
GameWebSocketDispatcher::sendDirect($connection, $frame['event'], [
|
||
'topic' => $frame['topic'],
|
||
'data' => $frame['data'],
|
||
'server_time' => $serverTime,
|
||
], 'admin_test_preview');
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 兼容多种 Workerman 版本:尝试取握手 URI 的 query 部分。
|
||
* Workerman v5 中 onWebSocketConnect 第二参数已是 array $headers/$request;这里尽量兼容字符串与对象。
|
||
*/
|
||
private static function extractQueryString(TcpConnection $connection, mixed $request): string
|
||
{
|
||
if (isset($connection->wsHandshakeQuery) && is_string($connection->wsHandshakeQuery)) {
|
||
return (string) $connection->wsHandshakeQuery;
|
||
}
|
||
if (isset($connection->onWebSocketConnect)) {
|
||
// noop
|
||
}
|
||
$serverUri = '';
|
||
if (isset($_SERVER['REQUEST_URI']) && is_string($_SERVER['REQUEST_URI'])) {
|
||
$serverUri = $_SERVER['REQUEST_URI'];
|
||
}
|
||
if ($serverUri === '' && isset($connection->headers) && is_array($connection->headers)) {
|
||
$line = $connection->headers['get'] ?? $connection->headers['GET'] ?? '';
|
||
if (is_string($line)) {
|
||
$serverUri = $line;
|
||
}
|
||
}
|
||
if ($serverUri === '' && is_string($request)) {
|
||
if (preg_match('#^GET\s+([^\s]+)#i', $request, $m) === 1) {
|
||
$serverUri = $m[1];
|
||
}
|
||
}
|
||
if ($serverUri === '' && is_object($request) && method_exists($request, 'queryString')) {
|
||
$serverUri = '?' . (string) $request->queryString();
|
||
}
|
||
$qPos = strpos($serverUri, '?');
|
||
if ($qPos === false) {
|
||
return '';
|
||
}
|
||
return substr($serverUri, $qPos + 1);
|
||
}
|
||
|
||
private static function remoteIp(TcpConnection $connection): string
|
||
{
|
||
try {
|
||
if (method_exists($connection, 'getRemoteIp')) {
|
||
return (string) $connection->getRemoteIp();
|
||
}
|
||
} catch (Throwable) {
|
||
}
|
||
return '';
|
||
}
|
||
}
|