Files
webman-buildadmin/app/common/service/GameWebSocketDispatcher.php

171 lines
5.9 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace app\common\service;
use support\Log;
use Throwable;
use Workerman\Connection\TcpConnection;
/**
* WebSocket 事件分发器(仅 gameWebSocketServer 进程内调用)。
*
* 职责:
* 1. 从 GameWebSocketEventBus 队列消费事件
* 2. 按 topic 反向索引取出候选 connection_id
* 3. user 级主题bet.win / user.streak / wallet.changed / bet.accepted 等)按
* data.user_id 与连接绑定 user_id 比对,仅命中本人才下发
* 4. 每一步都打 ws 日志,便于排查"为什么没收到推送"
*/
final class GameWebSocketDispatcher
{
/**
* 这些 topic 的 data.user_id 必须等于连接绑定的 user_id 才会下发;
* 其它 topicperiod.tick / period.opened / jackpot.hit / admin.* 等)一律广播给订阅者。
*
* 与 docs/36字花-移动端接口设计草案.md §7.1.2A 保持一致。
*
* @var list<string>
*/
private const USER_SCOPED_TOPICS = [
'bet.win',
'user.streak',
'wallet.changed',
'bet.accepted',
'auto.spin.progress',
];
/**
* 分发单条事件到所有命中的连接。
*
* @param array{topic:string, event:string, data:array<string,mixed>, server_time:int} $event
* @param array<int, TcpConnection> $connections connection_id => TcpConnection
*/
public static function dispatch(array $event, array $connections): void
{
$topic = $event['topic'] ?? '';
if (!is_string($topic) || $topic === '') {
return;
}
$candidateIds = GameWebSocketSubscriptionRegistry::connectionsForTopic($topic);
if ($candidateIds === []) {
Log::channel('ws')->debug('dispatch skip: no subscriber', [
'topic' => $topic,
'queue_server_time' => $event['server_time'] ?? 0,
]);
return;
}
$userScoped = in_array($topic, self::USER_SCOPED_TOPICS, true);
$payloadUserId = 0;
if ($userScoped) {
$raw = $event['data']['user_id'] ?? 0;
$parsed = filter_var($raw, FILTER_VALIDATE_INT);
$payloadUserId = $parsed === false ? 0 : (int) $parsed;
}
$rawData = is_array($event['data'] ?? null) ? $event['data'] : [];
$clientData = GameWebSocketPayloadHelper::sanitizeOutboundData($rawData);
$frame = json_encode([
'event' => $event['event'] ?? $topic,
'topic' => $topic,
'data' => $clientData,
'server_time' => $event['server_time'] ?? time(),
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
if (!is_string($frame) || $frame === '') {
Log::channel('ws')->warning('dispatch skip: invalid json frame', [
'topic' => $topic,
]);
return;
}
$matched = 0;
$skippedNotOwner = 0;
$skippedClosed = 0;
$sendFailed = 0;
foreach ($candidateIds as $cid) {
if (!isset($connections[$cid])) {
$skippedClosed++;
continue;
}
if ($userScoped && $payloadUserId > 0) {
$boundUid = GameWebSocketSubscriptionRegistry::userIdOf($cid);
// admin/ws 联调场景admin 连接在握手时绑定 user_id=0允许观测所有 user-scoped topic例如 bet.win
// mobile 用户连接则必须严格匹配 data.user_id避免泄露其它用户事件
if ($boundUid !== 0 && $boundUid !== $payloadUserId) {
$skippedNotOwner++;
continue;
}
}
try {
$connections[$cid]->send($frame);
$matched++;
} catch (Throwable $e) {
$sendFailed++;
Log::channel('ws')->warning('dispatch send failed', [
'topic' => $topic,
'connection_id' => $cid,
'error' => $e->getMessage(),
]);
}
}
Log::channel('ws')->info('dispatch', [
'topic' => $topic,
'user_scoped' => $userScoped,
'payload_user_id' => $payloadUserId,
'candidates' => count($candidateIds),
'matched' => $matched,
'skipped_not_owner' => $skippedNotOwner,
'skipped_closed' => $skippedClosed,
'send_failed' => $sendFailed,
'frame_size' => strlen($frame),
]);
}
/**
* 直接向某连接下发单帧(握手回执 / 订阅回执 / pong / 演示帧)。
*/
public static function sendDirect(TcpConnection $connection, string $event, array $data, string $tag = ''): void
{
$controlEvents = ['ws.connected', 'ws.subscribed', 'ws.error', 'pong'];
$payload = $data;
if (!in_array($event, $controlEvents, true)) {
if (isset($payload['data']) && is_array($payload['data'])) {
$payload['data'] = GameWebSocketPayloadHelper::sanitizeOutboundData($payload['data']);
}
}
if ($event === 'ws.connected') {
unset($payload['user_id']);
}
$frame = json_encode(array_merge([
'event' => $event,
], $payload), JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
if (!is_string($frame) || $frame === '') {
return;
}
try {
$connection->send($frame);
Log::channel('ws')->debug('direct send', [
'connection_id' => $connection->id,
'event' => $event,
'tag' => $tag,
'frame_size' => strlen($frame),
]);
} catch (Throwable $e) {
Log::channel('ws')->warning('direct send failed', [
'connection_id' => $connection->id,
'event' => $event,
'tag' => $tag,
'error' => $e->getMessage(),
]);
}
}
}