198 lines
6.7 KiB
PHP
198 lines
6.7 KiB
PHP
<?php
|
||
|
||
declare(strict_types=1);
|
||
|
||
namespace app\process;
|
||
|
||
use app\common\service\GameWebSocketEventBus;
|
||
use app\common\service\GameLiveService;
|
||
use Workerman\Connection\TcpConnection;
|
||
use Workerman\Timer;
|
||
|
||
/**
|
||
* 后台测试页 WebSocket 服务(仅用于连接联调)
|
||
*/
|
||
class GameWebSocketServer
|
||
{
|
||
/** @var array<int, TcpConnection> */
|
||
private static array $connections = [];
|
||
|
||
private static bool $eventBusConsumerStarted = false;
|
||
private static bool $adminSnapshotTickerStarted = false;
|
||
|
||
/**
|
||
* 从 Redis 队列拉取事件并推送给已订阅连接。
|
||
* 部分环境下 WebSocket 进程的 onWorkerStart 可能未触发,因此在首帧握手处也会兜底启动一次(全局仅注册一个 Timer)。
|
||
*/
|
||
private static function ensureEventBusConsumer(): void
|
||
{
|
||
if (self::$eventBusConsumerStarted) {
|
||
return;
|
||
}
|
||
self::$eventBusConsumerStarted = true;
|
||
Timer::add(1, static function (): void {
|
||
$events = GameWebSocketEventBus::popBatch();
|
||
if ($events === []) {
|
||
return;
|
||
}
|
||
foreach ($events as $event) {
|
||
$topic = $event['topic'] ?? '';
|
||
if (!is_string($topic) || $topic === '') {
|
||
continue;
|
||
}
|
||
$eventName = $event['event'] ?? $topic;
|
||
$data = $event['data'] ?? [];
|
||
if (!is_array($data)) {
|
||
$data = [];
|
||
}
|
||
$serverTime = $event['server_time'] ?? time();
|
||
foreach (self::$connections as $connection) {
|
||
$topics = $connection->topics ?? [];
|
||
if (!is_array($topics) || !in_array($topic, $topics, true)) {
|
||
continue;
|
||
}
|
||
$connection->send(json_encode([
|
||
'event' => $eventName,
|
||
'topic' => $topic,
|
||
'data' => $data,
|
||
'server_time' => $serverTime,
|
||
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
|
||
}
|
||
}
|
||
});
|
||
}
|
||
|
||
/**
|
||
* 兜底直推:admin.live.snapshot 每秒主动构建并广播。
|
||
* 目的:即使 Redis 队列不可用,也能保证 /admin/game/live 实时看到对局变化。
|
||
*/
|
||
private static function ensureAdminLiveSnapshotTicker(): void
|
||
{
|
||
if (self::$adminSnapshotTickerStarted) {
|
||
return;
|
||
}
|
||
self::$adminSnapshotTickerStarted = true;
|
||
Timer::add(1, static function (): void {
|
||
$hasAdminSubscriber = false;
|
||
foreach (self::$connections as $connection) {
|
||
$topics = $connection->topics ?? [];
|
||
if (is_array($topics) && in_array('admin.live.snapshot', $topics, true)) {
|
||
$hasAdminSubscriber = true;
|
||
break;
|
||
}
|
||
}
|
||
if (!$hasAdminSubscriber) {
|
||
return;
|
||
}
|
||
$snapshot = GameLiveService::buildSnapshot(null);
|
||
$payload = json_encode([
|
||
'event' => 'admin.live.snapshot',
|
||
'topic' => 'admin.live.snapshot',
|
||
'data' => $snapshot,
|
||
'server_time' => time(),
|
||
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
|
||
if (!is_string($payload) || $payload === '') {
|
||
return;
|
||
}
|
||
foreach (self::$connections as $connection) {
|
||
$topics = $connection->topics ?? [];
|
||
if (!is_array($topics) || !in_array('admin.live.snapshot', $topics, true)) {
|
||
continue;
|
||
}
|
||
$connection->send($payload);
|
||
}
|
||
});
|
||
}
|
||
|
||
public function onWorkerStart(): void
|
||
{
|
||
self::ensureEventBusConsumer();
|
||
self::ensureAdminLiveSnapshotTicker();
|
||
}
|
||
|
||
public function onConnect(TcpConnection $connection): void
|
||
{
|
||
$connection->topics = [];
|
||
self::$connections[$connection->id] = $connection;
|
||
}
|
||
|
||
public function onWebSocketConnect(TcpConnection $connection): void
|
||
{
|
||
self::ensureEventBusConsumer();
|
||
self::ensureAdminLiveSnapshotTicker();
|
||
$connection->send(json_encode([
|
||
'event' => 'ws.connected',
|
||
'message' => 'WebSocket connected',
|
||
'connection_id' => $connection->id,
|
||
'server_time' => time(),
|
||
'heartbeat_interval' => 30,
|
||
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
|
||
}
|
||
|
||
public function onMessage(TcpConnection $connection, string $payload): void
|
||
{
|
||
$decoded = json_decode($payload, true);
|
||
if (!is_array($decoded)) {
|
||
$connection->send(json_encode([
|
||
'event' => 'ws.error',
|
||
'message' => 'Invalid JSON payload',
|
||
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
|
||
return;
|
||
}
|
||
|
||
$action = $decoded['action'] ?? '';
|
||
if ($action === 'ping') {
|
||
$connection->send(json_encode([
|
||
'event' => 'pong',
|
||
'server_time' => date('Y-m-d H:i:s'),
|
||
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
|
||
return;
|
||
}
|
||
|
||
if ($action === 'subscribe') {
|
||
$topics = $decoded['topics'] ?? [];
|
||
$sanitized = [];
|
||
if (is_array($topics)) {
|
||
foreach ($topics as $topic) {
|
||
if (!is_string($topic)) {
|
||
continue;
|
||
}
|
||
$value = trim($topic);
|
||
if ($value === '') {
|
||
continue;
|
||
}
|
||
$sanitized[] = $value;
|
||
}
|
||
}
|
||
$connection->topics = array_values(array_unique($sanitized));
|
||
$connection->send(json_encode([
|
||
'event' => 'ws.subscribed',
|
||
'topics' => $connection->topics,
|
||
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
|
||
return;
|
||
}
|
||
|
||
$connection->send(json_encode([
|
||
'event' => 'ws.error',
|
||
'message' => 'Unsupported action',
|
||
'received_action' => $action,
|
||
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
|
||
}
|
||
|
||
public function onError(TcpConnection $connection, int $code, string $msg): void
|
||
{
|
||
$connection->send(json_encode([
|
||
'event' => 'ws.error',
|
||
'message' => 'Server internal error',
|
||
'code' => $code,
|
||
'detail' => $msg,
|
||
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
|
||
}
|
||
|
||
public function onClose(TcpConnection $connection): void
|
||
{
|
||
$connection->topics = [];
|
||
unset(self::$connections[$connection->id]);
|
||
}
|
||
}
|