Files
webman-buildadmin/app/process/GameWebSocketServer.php

420 lines
16 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace app\process;
use app\common\service\GameLiveService;
use app\common\service\GameWebSocketAuthHelper;
use app\common\service\GameWebSocketDispatcher;
use app\common\service\GameWebSocketEventBus;
use app\common\service\GameWebSocketPayloadHelper;
use app\common\service\GameWebSocketSubscriptionRegistry;
use support\Log;
use Throwable;
use Workerman\Connection\TcpConnection;
use Workerman\Timer;
/**
* H5/后台 WebSocket 服务重构版2026-05
*
* 设计与 docs/36字花-移动端接口设计草案.md §7 对齐:
*
* - 握手鉴权GameWebSocketAuthHelper
* - mobileURL Query **`auth-token`** + **`user-token`**(与 HTTP 头一致),绑定 user_id
* - admin URL Query **`admin-ws-token`**(后台 wsConfig 签发user_id=0
* user 级主题不过滤(运维/联调可观测全量)
* - 客户端 -> 服务端:`{"action":"ping"}` / `{"action":"subscribe","topics":[...]}`
* - 服务端 -> 客户端:`ws.connected` / `ws.subscribed` / `pong` / `ws.error` / 业务事件帧
* - 主动心跳超时:连接 60s 内无任何上行报文(含 ping即被 server 主动 close
* 触发客户端重连,避免半关闭僵尸连接堵在分发表里
* - 事件分发由 GameWebSocketDispatcher 负责;每次 publish/consume/dispatch/send/订阅/关闭
* 都写入独立日志通道 runtime/logs/ws.log
*
* 高可用:
* - 消费 Timer 的 popBatch 异常已被 EventBus 内部捕获并记录,下次 tick 自动恢复
* - 任一连接 send 异常不会影响其它连接的下发Dispatcher 内单连接 try/catch
* - admin.live.snapshot 兜底直推 Timer 与队列消费解耦Redis 异常时后台仍能看到对局状态
*/
class GameWebSocketServer
{
/** @var array<int, TcpConnection> connection_id => TcpConnection仅本进程内 */
private static array $connections = [];
/** @var array<int, array<string, mixed>> connection_id => 鉴权元数据mode/admin_id 等,便于日志/排查) */
private static array $connectionAuth = [];
private static bool $eventBusConsumerStarted = false;
private static bool $adminSnapshotTickerStarted = false;
private static bool $heartbeatCheckerStarted = false;
/** 客户端 60s 内无任何上行报文(含 ping即被主动断开 */
private const HEARTBEAT_IDLE_SECONDS = 60;
/** 事件队列每 N ms 拉一次(默认 1s与文档 §7.1.3 一致) */
private const QUEUE_TICK_INTERVAL = 1;
/** 心跳超时检查间隔(秒) */
private const HEARTBEAT_CHECK_INTERVAL = 10;
public function onWorkerStart(): void
{
Log::channel('ws')->info('ws worker start', [
'pid' => function_exists('posix_getpid') ? posix_getpid() : getmypid(),
]);
self::ensureEventBusConsumer();
self::ensureAdminLiveSnapshotTicker();
self::ensureHeartbeatChecker();
}
public function onConnect(TcpConnection $connection): void
{
$connection->topics = [];
$connection->wsAuth = null;
self::$connections[$connection->id] = $connection;
Log::channel('ws')->debug('tcp onConnect', [
'connection_id' => $connection->id,
'remote' => method_exists($connection, 'getRemoteIp') ? $connection->getRemoteIp() : '',
]);
}
/**
* Workerman 在 WebSocket 握手时回调;$request 可能是 stringHTTP raw或 Webman\Http\Request 对象,
* 这里仅尝试从 connection 内置变量取 QueryString 做最大兼容。
*/
public function onWebSocketConnect(TcpConnection $connection, mixed $request = null): void
{
self::ensureEventBusConsumer();
self::ensureAdminLiveSnapshotTicker();
self::ensureHeartbeatChecker();
$queryString = self::extractQueryString($connection, $request);
$query = GameWebSocketAuthHelper::parseQueryString($queryString);
$auth = GameWebSocketAuthHelper::authorize($query);
$remoteIp = self::remoteIp($connection);
if (!$auth['ok']) {
Log::channel('ws')->warning('handshake denied', [
'connection_id' => $connection->id,
'remote_ip' => $remoteIp,
'reason' => $auth['reason'],
'query_keys' => array_keys($query),
]);
GameWebSocketDispatcher::sendDirect($connection, 'ws.error', [
'code' => 1101,
'message' => 'Authentication failed: ' . ($auth['reason'] ?: 'unauthorized'),
], 'handshake_denied');
try {
$connection->close();
} catch (Throwable) {
}
return;
}
$connection->wsAuth = $auth;
self::$connectionAuth[$connection->id] = $auth;
GameWebSocketSubscriptionRegistry::registerConnection($connection->id, (int) $auth['user_id'], $remoteIp);
Log::channel('ws')->info('handshake ok', [
'connection_id' => $connection->id,
'remote_ip' => $remoteIp,
'mode' => $auth['mode'],
'user_id' => $auth['user_id'],
'admin_id' => $auth['admin_id'],
]);
GameWebSocketDispatcher::sendDirect($connection, 'ws.connected', [
'message' => 'WebSocket connected',
'connection_id' => $connection->id,
'mode' => $auth['mode'],
'user_id' => $auth['user_id'],
'server_time' => time(),
'heartbeat_interval' => 30,
'idle_timeout' => self::HEARTBEAT_IDLE_SECONDS,
], 'handshake_ok');
}
public function onMessage(TcpConnection $connection, string $payload): void
{
GameWebSocketSubscriptionRegistry::touch($connection->id);
$decoded = json_decode($payload, true);
if (!is_array($decoded)) {
Log::channel('ws')->debug('invalid json payload', [
'connection_id' => $connection->id,
'payload_size' => strlen($payload),
]);
GameWebSocketDispatcher::sendDirect($connection, 'ws.error', [
'message' => 'Invalid JSON payload',
], 'invalid_json');
return;
}
$action = isset($decoded['action']) && is_string($decoded['action']) ? trim($decoded['action']) : '';
if ($action === 'ping') {
GameWebSocketDispatcher::sendDirect($connection, 'pong', [
'server_time' => date('Y-m-d H:i:s'),
'server_time_ts' => time(),
], 'pong');
return;
}
if ($action === 'subscribe') {
$rawTopics = $decoded['topics'] ?? [];
$rawList = is_array($rawTopics) ? $rawTopics : [];
$finalTopics = GameWebSocketSubscriptionRegistry::replaceSubscriptions($connection->id, $rawList);
Log::channel('ws')->info('subscribe', [
'connection_id' => $connection->id,
'user_id' => GameWebSocketSubscriptionRegistry::userIdOf($connection->id),
'topics' => $finalTopics,
'requested_count' => is_array($rawTopics) ? count($rawTopics) : 0,
]);
GameWebSocketDispatcher::sendDirect($connection, 'ws.subscribed', [
'topics' => $finalTopics,
], 'subscribed');
self::pushAdminTestOddsPreview($connection, $finalTopics);
return;
}
Log::channel('ws')->debug('unsupported action', [
'connection_id' => $connection->id,
'action' => $action,
]);
GameWebSocketDispatcher::sendDirect($connection, 'ws.error', [
'message' => 'Unsupported action',
'received_action' => $action,
], 'unsupported_action');
}
public function onError(TcpConnection $connection, int $code, string $msg): void
{
Log::channel('ws')->warning('connection error', [
'connection_id' => $connection->id,
'code' => $code,
'detail' => $msg,
]);
try {
$connection->send(json_encode([
'event' => 'ws.error',
'message' => 'Server internal error',
'code' => $code,
'detail' => $msg,
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
} catch (Throwable) {
}
}
public function onClose(TcpConnection $connection): void
{
$auth = self::$connectionAuth[$connection->id] ?? [];
Log::channel('ws')->info('connection close', [
'connection_id' => $connection->id,
'user_id' => GameWebSocketSubscriptionRegistry::userIdOf($connection->id),
'mode' => $auth['mode'] ?? '',
'remaining_connections' => max(0, GameWebSocketSubscriptionRegistry::connectionCount() - 1),
]);
GameWebSocketSubscriptionRegistry::unregisterConnection($connection->id);
unset(self::$connections[$connection->id], self::$connectionAuth[$connection->id]);
}
// ============================================================
// 内部Timer / 兜底
// ============================================================
/**
* 每秒拉取 Redis 队列事件并分发到对应订阅连接。
* popBatch 内部已捕获 Redis 异常并写 ws 日志Timer 不会因此停跑。
*/
private static function ensureEventBusConsumer(): void
{
if (self::$eventBusConsumerStarted) {
return;
}
self::$eventBusConsumerStarted = true;
Timer::add(self::QUEUE_TICK_INTERVAL, static function (): void {
try {
$events = GameWebSocketEventBus::popBatch();
if ($events === []) {
return;
}
foreach ($events as $event) {
try {
GameWebSocketDispatcher::dispatch($event, self::$connections);
} catch (Throwable $e) {
Log::channel('ws')->error('dispatch loop exception', [
'topic' => $event['topic'] ?? '',
'error' => $e->getMessage(),
]);
}
}
} catch (Throwable $e) {
Log::channel('ws')->error('event bus consumer tick exception', [
'error' => $e->getMessage(),
]);
}
});
Log::channel('ws')->info('event bus consumer started', [
'tick_interval_seconds' => self::QUEUE_TICK_INTERVAL,
]);
}
/**
* admin.live.snapshot 兜底:每秒构建一次快照直接推送(不依赖 Redis 队列)。
* 用于 /admin/game/live 实时对局页,确保 Redis 异常时后台仍能看到对局状态。
*/
private static function ensureAdminLiveSnapshotTicker(): void
{
if (self::$adminSnapshotTickerStarted) {
return;
}
self::$adminSnapshotTickerStarted = true;
Timer::add(1, static function (): void {
$cids = GameWebSocketSubscriptionRegistry::connectionsForTopic('admin.live.snapshot');
if ($cids === []) {
return;
}
try {
$snapshot = GameLiveService::buildSnapshot(null);
} catch (Throwable $e) {
Log::channel('ws')->error('admin snapshot build failed', [
'error' => $e->getMessage(),
]);
return;
}
$payload = json_encode([
'event' => 'admin.live.snapshot',
'topic' => 'admin.live.snapshot',
'data' => $snapshot,
'server_time' => time(),
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
if (!is_string($payload) || $payload === '') {
return;
}
$sent = 0;
$failed = 0;
foreach ($cids as $cid) {
if (!isset(self::$connections[$cid])) {
continue;
}
try {
self::$connections[$cid]->send($payload);
$sent++;
} catch (Throwable $e) {
$failed++;
Log::channel('ws')->warning('admin snapshot send failed', [
'connection_id' => $cid,
'error' => $e->getMessage(),
]);
}
}
});
}
/**
* 心跳超时检查器:每 10s 扫描一次,关闭 60s 无上行报文的连接。
* 触发客户端重连,避免半关闭僵尸连接持续接收推送但不实际送达。
*/
private static function ensureHeartbeatChecker(): void
{
if (self::$heartbeatCheckerStarted) {
return;
}
self::$heartbeatCheckerStarted = true;
Timer::add(self::HEARTBEAT_CHECK_INTERVAL, static function (): void {
$cutoff = time() - self::HEARTBEAT_IDLE_SECONDS;
$stale = GameWebSocketSubscriptionRegistry::staleConnections($cutoff);
if ($stale === []) {
return;
}
foreach ($stale as $cid) {
Log::channel('ws')->info('close idle connection', [
'connection_id' => $cid,
'user_id' => GameWebSocketSubscriptionRegistry::userIdOf($cid),
'idle_seconds' => self::HEARTBEAT_IDLE_SECONDS,
]);
if (isset(self::$connections[$cid])) {
try {
self::$connections[$cid]->close();
} catch (Throwable) {
}
}
}
});
Log::channel('ws')->info('heartbeat checker started', [
'idle_seconds' => self::HEARTBEAT_IDLE_SECONDS,
'check_interval' => self::HEARTBEAT_CHECK_INTERVAL,
]);
}
/**
* 后台联调:订阅赔率相关主题后立即推送演示帧(库内样例玩家,带 is_test / preview
*
* @param list<string> $topics
*/
private static function pushAdminTestOddsPreview(TcpConnection $connection, array $topics): void
{
$frames = GameWebSocketPayloadHelper::adminTestPushFrames($topics);
if ($frames === []) {
return;
}
$serverTime = time();
foreach ($frames as $frame) {
GameWebSocketDispatcher::sendDirect($connection, $frame['event'], [
'topic' => $frame['topic'],
'data' => $frame['data'],
'server_time' => $serverTime,
], 'admin_test_preview');
}
}
/**
* 兼容多种 Workerman 版本:尝试取握手 URI 的 query 部分。
* Workerman v5 中 onWebSocketConnect 第二参数已是 array $headers/$request这里尽量兼容字符串与对象。
*/
private static function extractQueryString(TcpConnection $connection, mixed $request): string
{
if (isset($connection->wsHandshakeQuery) && is_string($connection->wsHandshakeQuery)) {
return (string) $connection->wsHandshakeQuery;
}
if (isset($connection->onWebSocketConnect)) {
// noop
}
$serverUri = '';
if (isset($_SERVER['REQUEST_URI']) && is_string($_SERVER['REQUEST_URI'])) {
$serverUri = $_SERVER['REQUEST_URI'];
}
if ($serverUri === '' && isset($connection->headers) && is_array($connection->headers)) {
$line = $connection->headers['get'] ?? $connection->headers['GET'] ?? '';
if (is_string($line)) {
$serverUri = $line;
}
}
if ($serverUri === '' && is_string($request)) {
if (preg_match('#^GET\s+([^\s]+)#i', $request, $m) === 1) {
$serverUri = $m[1];
}
}
if ($serverUri === '' && is_object($request) && method_exists($request, 'queryString')) {
$serverUri = '?' . (string) $request->queryString();
}
$qPos = strpos($serverUri, '?');
if ($qPos === false) {
return '';
}
return substr($serverUri, $qPos + 1);
}
private static function remoteIp(TcpConnection $connection): string
{
try {
if (method_exists($connection, 'getRemoteIp')) {
return (string) $connection->getRemoteIp();
}
} catch (Throwable) {
}
return '';
}
}