169 lines
5.7 KiB
PHP
169 lines
5.7 KiB
PHP
<?php
|
||
|
||
declare(strict_types=1);
|
||
|
||
namespace app\common\service;
|
||
|
||
use support\Log;
|
||
use Throwable;
|
||
use Workerman\Connection\TcpConnection;
|
||
|
||
/**
|
||
* WebSocket 事件分发器(仅 gameWebSocketServer 进程内调用)。
|
||
*
|
||
* 职责:
|
||
* 1. 从 GameWebSocketEventBus 队列消费事件
|
||
* 2. 按 topic 反向索引取出候选 connection_id
|
||
* 3. user 级主题(bet.win / user.streak / wallet.changed / bet.accepted 等)按
|
||
* data.user_id 与连接绑定 user_id 比对,仅命中本人才下发
|
||
* 4. 每一步都打 ws 日志,便于排查"为什么没收到推送"
|
||
*/
|
||
final class GameWebSocketDispatcher
|
||
{
|
||
/**
|
||
* 这些 topic 的 data.user_id 必须等于连接绑定的 user_id 才会下发;
|
||
* 其它 topic(period.tick / period.opened / jackpot.hit / admin.* 等)一律广播给订阅者。
|
||
*
|
||
* 与 docs/36字花-移动端接口设计草案.md §7.1.2A 保持一致。
|
||
*
|
||
* @var list<string>
|
||
*/
|
||
private const USER_SCOPED_TOPICS = [
|
||
'bet.win',
|
||
'user.streak',
|
||
'wallet.changed',
|
||
'bet.accepted',
|
||
'auto.spin.progress',
|
||
];
|
||
|
||
/**
|
||
* 分发单条事件到所有命中的连接。
|
||
*
|
||
* @param array{topic:string, event:string, data:array<string,mixed>, server_time:int} $event
|
||
* @param array<int, TcpConnection> $connections connection_id => TcpConnection
|
||
*/
|
||
public static function dispatch(array $event, array $connections): void
|
||
{
|
||
$topic = $event['topic'] ?? '';
|
||
if (!is_string($topic) || $topic === '') {
|
||
return;
|
||
}
|
||
|
||
$candidateIds = GameWebSocketSubscriptionRegistry::connectionsForTopic($topic);
|
||
if ($candidateIds === []) {
|
||
Log::channel('ws')->debug('dispatch skip: no subscriber', [
|
||
'topic' => $topic,
|
||
'queue_server_time' => $event['server_time'] ?? 0,
|
||
]);
|
||
return;
|
||
}
|
||
|
||
$userScoped = in_array($topic, self::USER_SCOPED_TOPICS, true);
|
||
$payloadUserId = 0;
|
||
if ($userScoped) {
|
||
$raw = $event['data']['user_id'] ?? 0;
|
||
$parsed = filter_var($raw, FILTER_VALIDATE_INT);
|
||
$payloadUserId = $parsed === false ? 0 : (int) $parsed;
|
||
}
|
||
|
||
$rawData = is_array($event['data'] ?? null) ? $event['data'] : [];
|
||
$clientData = GameWebSocketPayloadHelper::sanitizeOutboundData($rawData);
|
||
|
||
$frame = json_encode([
|
||
'event' => $event['event'] ?? $topic,
|
||
'topic' => $topic,
|
||
'data' => $clientData,
|
||
'server_time' => $event['server_time'] ?? time(),
|
||
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
|
||
if (!is_string($frame) || $frame === '') {
|
||
Log::channel('ws')->warning('dispatch skip: invalid json frame', [
|
||
'topic' => $topic,
|
||
]);
|
||
return;
|
||
}
|
||
|
||
$matched = 0;
|
||
$skippedNotOwner = 0;
|
||
$skippedClosed = 0;
|
||
$sendFailed = 0;
|
||
|
||
foreach ($candidateIds as $cid) {
|
||
if (!isset($connections[$cid])) {
|
||
$skippedClosed++;
|
||
continue;
|
||
}
|
||
if ($userScoped && $payloadUserId > 0) {
|
||
$boundUid = GameWebSocketSubscriptionRegistry::userIdOf($cid);
|
||
if ($boundUid !== $payloadUserId) {
|
||
$skippedNotOwner++;
|
||
continue;
|
||
}
|
||
}
|
||
|
||
try {
|
||
$connections[$cid]->send($frame);
|
||
$matched++;
|
||
} catch (Throwable $e) {
|
||
$sendFailed++;
|
||
Log::channel('ws')->warning('dispatch send failed', [
|
||
'topic' => $topic,
|
||
'connection_id' => $cid,
|
||
'error' => $e->getMessage(),
|
||
]);
|
||
}
|
||
}
|
||
|
||
Log::channel('ws')->info('dispatch', [
|
||
'topic' => $topic,
|
||
'user_scoped' => $userScoped,
|
||
'payload_user_id' => $payloadUserId,
|
||
'candidates' => count($candidateIds),
|
||
'matched' => $matched,
|
||
'skipped_not_owner' => $skippedNotOwner,
|
||
'skipped_closed' => $skippedClosed,
|
||
'send_failed' => $sendFailed,
|
||
'frame_size' => strlen($frame),
|
||
]);
|
||
}
|
||
|
||
/**
|
||
* 直接向某连接下发单帧(握手回执 / 订阅回执 / pong / 演示帧)。
|
||
*/
|
||
public static function sendDirect(TcpConnection $connection, string $event, array $data, string $tag = ''): void
|
||
{
|
||
$controlEvents = ['ws.connected', 'ws.subscribed', 'ws.error', 'pong'];
|
||
$payload = $data;
|
||
if (!in_array($event, $controlEvents, true)) {
|
||
if (isset($payload['data']) && is_array($payload['data'])) {
|
||
$payload['data'] = GameWebSocketPayloadHelper::sanitizeOutboundData($payload['data']);
|
||
}
|
||
}
|
||
if ($event === 'ws.connected') {
|
||
unset($payload['user_id']);
|
||
}
|
||
|
||
$frame = json_encode(array_merge([
|
||
'event' => $event,
|
||
], $payload), JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
|
||
if (!is_string($frame) || $frame === '') {
|
||
return;
|
||
}
|
||
try {
|
||
$connection->send($frame);
|
||
Log::channel('ws')->debug('direct send', [
|
||
'connection_id' => $connection->id,
|
||
'event' => $event,
|
||
'tag' => $tag,
|
||
'frame_size' => strlen($frame),
|
||
]);
|
||
} catch (Throwable $e) {
|
||
Log::channel('ws')->warning('direct send failed', [
|
||
'connection_id' => $connection->id,
|
||
'event' => $event,
|
||
'tag' => $tag,
|
||
'error' => $e->getMessage(),
|
||
]);
|
||
}
|
||
}
|
||
}
|