Files
webman-buildadmin/app/process/GameWebSocketServer.php

225 lines
7.9 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace app\process;
use app\common\service\GameWebSocketEventBus;
use app\common\service\GameLiveService;
use app\common\service\GameWebSocketPayloadHelper;
use Workerman\Connection\TcpConnection;
use Workerman\Timer;
/**
* 后台测试页 WebSocket 服务(仅用于连接联调)
*/
class GameWebSocketServer
{
/** @var array<int, TcpConnection> */
private static array $connections = [];
private static bool $eventBusConsumerStarted = false;
private static bool $adminSnapshotTickerStarted = false;
/**
* 从 Redis 队列拉取事件并推送给已订阅连接。
* 部分环境下 WebSocket 进程的 onWorkerStart 可能未触发,因此在首帧握手处也会兜底启动一次(全局仅注册一个 Timer
*/
private static function ensureEventBusConsumer(): void
{
if (self::$eventBusConsumerStarted) {
return;
}
self::$eventBusConsumerStarted = true;
Timer::add(1, static function (): void {
$events = GameWebSocketEventBus::popBatch();
if ($events === []) {
return;
}
foreach ($events as $event) {
$topic = $event['topic'] ?? '';
if (!is_string($topic) || $topic === '') {
continue;
}
$eventName = $event['event'] ?? $topic;
$data = $event['data'] ?? [];
if (!is_array($data)) {
$data = [];
}
$serverTime = $event['server_time'] ?? time();
foreach (self::$connections as $connection) {
$topics = $connection->topics ?? [];
if (!is_array($topics) || !in_array($topic, $topics, true)) {
continue;
}
$connection->send(json_encode([
'event' => $eventName,
'topic' => $topic,
'data' => $data,
'server_time' => $serverTime,
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
}
}
});
}
/**
* 兜底直推admin.live.snapshot 每秒主动构建并广播。
* 目的:即使 Redis 队列不可用,也能保证 /admin/game/live 实时看到对局变化。
*/
private static function ensureAdminLiveSnapshotTicker(): void
{
if (self::$adminSnapshotTickerStarted) {
return;
}
self::$adminSnapshotTickerStarted = true;
Timer::add(1, static function (): void {
$hasAdminSubscriber = false;
foreach (self::$connections as $connection) {
$topics = $connection->topics ?? [];
if (is_array($topics) && in_array('admin.live.snapshot', $topics, true)) {
$hasAdminSubscriber = true;
break;
}
}
if (!$hasAdminSubscriber) {
return;
}
// 与 GameLiveTicker 对齐:仅推快照时若 live ticker 未运行会导致倒计时归零但永不开奖
GameLiveService::finalizePayoutGrace();
GameLiveService::tickAutoDraw();
$snapshot = GameLiveService::buildSnapshot(null);
$payload = json_encode([
'event' => 'admin.live.snapshot',
'topic' => 'admin.live.snapshot',
'data' => $snapshot,
'server_time' => time(),
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
if (!is_string($payload) || $payload === '') {
return;
}
foreach (self::$connections as $connection) {
$topics = $connection->topics ?? [];
if (!is_array($topics) || !in_array('admin.live.snapshot', $topics, true)) {
continue;
}
$connection->send($payload);
}
});
}
public function onWorkerStart(): void
{
self::ensureEventBusConsumer();
self::ensureAdminLiveSnapshotTicker();
}
public function onConnect(TcpConnection $connection): void
{
$connection->topics = [];
self::$connections[$connection->id] = $connection;
}
public function onWebSocketConnect(TcpConnection $connection): void
{
self::ensureEventBusConsumer();
self::ensureAdminLiveSnapshotTicker();
$connection->send(json_encode([
'event' => 'ws.connected',
'message' => 'WebSocket connected',
'connection_id' => $connection->id,
'server_time' => time(),
'heartbeat_interval' => 30,
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
}
public function onMessage(TcpConnection $connection, string $payload): void
{
$decoded = json_decode($payload, true);
if (!is_array($decoded)) {
$connection->send(json_encode([
'event' => 'ws.error',
'message' => 'Invalid JSON payload',
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
return;
}
$action = $decoded['action'] ?? '';
if ($action === 'ping') {
$connection->send(json_encode([
'event' => 'pong',
'server_time' => date('Y-m-d H:i:s'),
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
return;
}
if ($action === 'subscribe') {
$topics = $decoded['topics'] ?? [];
$sanitized = [];
if (is_array($topics)) {
foreach ($topics as $topic) {
if (!is_string($topic)) {
continue;
}
$value = trim($topic);
if ($value === '') {
continue;
}
$sanitized[] = $value;
}
}
$connection->topics = array_values(array_unique($sanitized));
$connection->send(json_encode([
'event' => 'ws.subscribed',
'topics' => $connection->topics,
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
self::pushAdminTestOddsPreview($connection, $connection->topics);
return;
}
$connection->send(json_encode([
'event' => 'ws.error',
'message' => 'Unsupported action',
'received_action' => $action,
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
}
public function onError(TcpConnection $connection, int $code, string $msg): void
{
$connection->send(json_encode([
'event' => 'ws.error',
'message' => 'Server internal error',
'code' => $code,
'detail' => $msg,
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
}
public function onClose(TcpConnection $connection): void
{
$connection->topics = [];
unset(self::$connections[$connection->id]);
}
/**
* 后台联调:订阅赔率相关主题后立即推送演示帧(库内样例玩家,带 is_test / preview
*
* @param list<string> $topics
*/
private static function pushAdminTestOddsPreview(TcpConnection $connection, array $topics): void
{
$frames = GameWebSocketPayloadHelper::adminTestPushFrames($topics);
if ($frames === []) {
return;
}
$serverTime = time();
foreach ($frames as $frame) {
$connection->send(json_encode([
'event' => $frame['event'],
'topic' => $frame['topic'],
'data' => $frame['data'],
'server_time' => $serverTime,
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
}
}
}