252 lines
7.8 KiB
PHP
252 lines
7.8 KiB
PHP
<?php
|
||
|
||
declare(strict_types=1);
|
||
|
||
namespace app\common\service;
|
||
|
||
/**
|
||
* 进程内 WebSocket 订阅注册表(仅在 gameWebSocketServer 单进程内使用)。
|
||
*
|
||
* - 维护双向索引:
|
||
* - topic => map<connection_id, true>
|
||
* - connection => [ topics: list<string>, user_id: int, last_seen_at: int, remote_ip: string ]
|
||
* - 分发器按 (topic) 直接拿到候选连接列表,按 (data.user_id) 过滤后再 send,避免 O(N) 全遍历。
|
||
* - **必须** 与 GameWebSocketServer 同进程使用:count=1,不可水平扩展(多 worker 间无法共享连接)。
|
||
*
|
||
* 该类不持有 TcpConnection 引用,仅持有 connection_id 与元数据;Server 维护 connection_id => TcpConnection 映射。
|
||
*/
|
||
final class GameWebSocketSubscriptionRegistry
|
||
{
|
||
/** @var array<string, array<int, true>> topic => { connection_id: true } */
|
||
private static array $topicIndex = [];
|
||
|
||
/** @var array<int, array{topics: list<string>, user_id: int, mode: string, last_seen_at: int, remote_ip: string}> */
|
||
private static array $connectionMeta = [];
|
||
|
||
/**
|
||
* 注册新连接(onConnect 调用)。
|
||
*/
|
||
public static function registerConnection(int $connectionId, int $userId, string $mode = 'mobile', string $remoteIp = ''): void
|
||
{
|
||
if ($connectionId <= 0) {
|
||
return;
|
||
}
|
||
$mode = trim($mode);
|
||
if ($mode !== 'admin') {
|
||
$mode = 'mobile';
|
||
}
|
||
self::$connectionMeta[$connectionId] = [
|
||
'topics' => [],
|
||
'user_id' => max(0, $userId),
|
||
'mode' => $mode,
|
||
'last_seen_at' => time(),
|
||
'remote_ip' => $remoteIp,
|
||
];
|
||
}
|
||
|
||
/**
|
||
* 注销连接(onClose 调用):从所有 topic 索引中移除该 connection。
|
||
*/
|
||
public static function unregisterConnection(int $connectionId): void
|
||
{
|
||
if ($connectionId <= 0 || !isset(self::$connectionMeta[$connectionId])) {
|
||
return;
|
||
}
|
||
foreach (self::$connectionMeta[$connectionId]['topics'] as $topic) {
|
||
if (isset(self::$topicIndex[$topic][$connectionId])) {
|
||
unset(self::$topicIndex[$topic][$connectionId]);
|
||
if (self::$topicIndex[$topic] === []) {
|
||
unset(self::$topicIndex[$topic]);
|
||
}
|
||
}
|
||
}
|
||
unset(self::$connectionMeta[$connectionId]);
|
||
}
|
||
|
||
/**
|
||
* 替换该连接的订阅列表(subscribe 报文调用,覆盖式订阅,符合现网协议)。
|
||
*
|
||
* @param list<string> $topics
|
||
* @return list<string> 实际生效的、去重排序后的订阅列表
|
||
*/
|
||
public static function replaceSubscriptions(int $connectionId, array $topics): array
|
||
{
|
||
if ($connectionId <= 0 || !isset(self::$connectionMeta[$connectionId])) {
|
||
return [];
|
||
}
|
||
|
||
foreach (self::$connectionMeta[$connectionId]['topics'] as $oldTopic) {
|
||
if (isset(self::$topicIndex[$oldTopic][$connectionId])) {
|
||
unset(self::$topicIndex[$oldTopic][$connectionId]);
|
||
if (self::$topicIndex[$oldTopic] === []) {
|
||
unset(self::$topicIndex[$oldTopic]);
|
||
}
|
||
}
|
||
}
|
||
|
||
$clean = [];
|
||
foreach ($topics as $t) {
|
||
if (!is_string($t)) {
|
||
continue;
|
||
}
|
||
$v = trim($t);
|
||
if ($v === '' || strlen($v) > 64) {
|
||
continue;
|
||
}
|
||
$clean[$v] = true;
|
||
}
|
||
$finalTopics = array_keys($clean);
|
||
sort($finalTopics);
|
||
|
||
self::$connectionMeta[$connectionId]['topics'] = $finalTopics;
|
||
foreach ($finalTopics as $topic) {
|
||
self::$topicIndex[$topic][$connectionId] = true;
|
||
}
|
||
|
||
return $finalTopics;
|
||
}
|
||
|
||
/**
|
||
* 增量订阅:将 topics 合并到现有订阅集合(不会移除旧 topic)。
|
||
*
|
||
* 兼容部分客户端“多次 subscribe 但只携带增量 topic”的行为,避免后续误覆盖导致 bet.win/jackpot.hit 丢订阅。
|
||
*
|
||
* @param list<string> $topics
|
||
* @return list<string> 合并后的订阅列表(去重排序)
|
||
*/
|
||
public static function mergeSubscriptions(int $connectionId, array $topics): array
|
||
{
|
||
if ($connectionId <= 0 || !isset(self::$connectionMeta[$connectionId])) {
|
||
return [];
|
||
}
|
||
|
||
$clean = [];
|
||
foreach ($topics as $t) {
|
||
if (!is_string($t)) {
|
||
continue;
|
||
}
|
||
$v = trim($t);
|
||
if ($v === '' || strlen($v) > 64) {
|
||
continue;
|
||
}
|
||
$clean[$v] = true;
|
||
}
|
||
if ($clean === []) {
|
||
return self::$connectionMeta[$connectionId]['topics'];
|
||
}
|
||
|
||
$existing = self::$connectionMeta[$connectionId]['topics'];
|
||
$mergedMap = [];
|
||
foreach ($existing as $t) {
|
||
$mergedMap[$t] = true;
|
||
}
|
||
foreach (array_keys($clean) as $t) {
|
||
$mergedMap[$t] = true;
|
||
}
|
||
$finalTopics = array_keys($mergedMap);
|
||
sort($finalTopics);
|
||
|
||
// 只需要把新增 topic 写入 topicIndex(旧 topic 已存在索引)
|
||
$existingMap = [];
|
||
foreach ($existing as $t) {
|
||
$existingMap[$t] = true;
|
||
}
|
||
foreach ($finalTopics as $topic) {
|
||
if (!isset($existingMap[$topic])) {
|
||
self::$topicIndex[$topic][$connectionId] = true;
|
||
}
|
||
}
|
||
self::$connectionMeta[$connectionId]['topics'] = $finalTopics;
|
||
|
||
return $finalTopics;
|
||
}
|
||
|
||
/**
|
||
* 获取订阅了指定 topic 的所有 connection_id。
|
||
*
|
||
* @return list<int>
|
||
*/
|
||
public static function connectionsForTopic(string $topic): array
|
||
{
|
||
$topic = trim($topic);
|
||
if ($topic === '' || !isset(self::$topicIndex[$topic])) {
|
||
return [];
|
||
}
|
||
|
||
return array_keys(self::$topicIndex[$topic]);
|
||
}
|
||
|
||
/**
|
||
* 标记连接活跃时间(接收任意消息时调用,用于心跳超时判断)。
|
||
*/
|
||
public static function touch(int $connectionId): void
|
||
{
|
||
if (isset(self::$connectionMeta[$connectionId])) {
|
||
self::$connectionMeta[$connectionId]['last_seen_at'] = time();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* @return array{topics: list<string>, user_id: int, last_seen_at: int, remote_ip: string}|null
|
||
*/
|
||
public static function meta(int $connectionId): ?array
|
||
{
|
||
return self::$connectionMeta[$connectionId] ?? null;
|
||
}
|
||
|
||
public static function userIdOf(int $connectionId): int
|
||
{
|
||
return self::$connectionMeta[$connectionId]['user_id'] ?? 0;
|
||
}
|
||
|
||
public static function isAdmin(int $connectionId): bool
|
||
{
|
||
return (self::$connectionMeta[$connectionId]['mode'] ?? '') === 'admin';
|
||
}
|
||
|
||
/**
|
||
* 找出所有 last_seen_at 早于 $cutoff 的连接 id(用于服务端主动关闭僵尸连接)。
|
||
*
|
||
* @return list<int>
|
||
*/
|
||
public static function staleConnections(int $cutoff): array
|
||
{
|
||
$out = [];
|
||
foreach (self::$connectionMeta as $cid => $meta) {
|
||
if (($meta['last_seen_at'] ?? 0) < $cutoff) {
|
||
$out[] = $cid;
|
||
}
|
||
}
|
||
return $out;
|
||
}
|
||
|
||
/**
|
||
* 当前活跃连接数(运维/诊断用)。
|
||
*/
|
||
public static function connectionCount(): int
|
||
{
|
||
return count(self::$connectionMeta);
|
||
}
|
||
|
||
/**
|
||
* 当前活跃订阅总数(运维/诊断用)。
|
||
*/
|
||
public static function subscriptionCount(): int
|
||
{
|
||
$sum = 0;
|
||
foreach (self::$topicIndex as $conns) {
|
||
$sum += count($conns);
|
||
}
|
||
return $sum;
|
||
}
|
||
|
||
/**
|
||
* 仅供测试/进程重启时清空索引。
|
||
*/
|
||
public static function reset(): void
|
||
{
|
||
self::$topicIndex = [];
|
||
self::$connectionMeta = [];
|
||
}
|
||
}
|