Files
webman-buildadmin/app/common/service/GameWebSocketSubscriptionRegistry.php

197 lines
6.0 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace app\common\service;
/**
* 进程内 WebSocket 订阅注册表(仅在 gameWebSocketServer 单进程内使用)。
*
* - 维护双向索引:
* - topic => map<connection_id, true>
* - connection => [ topics: list<string>, user_id: int, last_seen_at: int, remote_ip: string ]
* - 分发器按 (topic) 直接拿到候选连接列表,按 (data.user_id) 过滤后再 send避免 O(N) 全遍历。
* - **必须** 与 GameWebSocketServer 同进程使用count=1不可水平扩展多 worker 间无法共享连接)。
*
* 该类不持有 TcpConnection 引用,仅持有 connection_id 与元数据Server 维护 connection_id => TcpConnection 映射。
*/
final class GameWebSocketSubscriptionRegistry
{
/** @var array<string, array<int, true>> topic => { connection_id: true } */
private static array $topicIndex = [];
/** @var array<int, array{topics: list<string>, user_id: int, mode: string, last_seen_at: int, remote_ip: string}> */
private static array $connectionMeta = [];
/**
* 注册新连接onConnect 调用)。
*/
public static function registerConnection(int $connectionId, int $userId, string $mode = 'mobile', string $remoteIp = ''): void
{
if ($connectionId <= 0) {
return;
}
$mode = trim($mode);
if ($mode !== 'admin') {
$mode = 'mobile';
}
self::$connectionMeta[$connectionId] = [
'topics' => [],
'user_id' => max(0, $userId),
'mode' => $mode,
'last_seen_at' => time(),
'remote_ip' => $remoteIp,
];
}
/**
* 注销连接onClose 调用):从所有 topic 索引中移除该 connection。
*/
public static function unregisterConnection(int $connectionId): void
{
if ($connectionId <= 0 || !isset(self::$connectionMeta[$connectionId])) {
return;
}
foreach (self::$connectionMeta[$connectionId]['topics'] as $topic) {
if (isset(self::$topicIndex[$topic][$connectionId])) {
unset(self::$topicIndex[$topic][$connectionId]);
if (self::$topicIndex[$topic] === []) {
unset(self::$topicIndex[$topic]);
}
}
}
unset(self::$connectionMeta[$connectionId]);
}
/**
* 替换该连接的订阅列表subscribe 报文调用,覆盖式订阅,符合现网协议)。
*
* @param list<string> $topics
* @return list<string> 实际生效的、去重排序后的订阅列表
*/
public static function replaceSubscriptions(int $connectionId, array $topics): array
{
if ($connectionId <= 0 || !isset(self::$connectionMeta[$connectionId])) {
return [];
}
foreach (self::$connectionMeta[$connectionId]['topics'] as $oldTopic) {
if (isset(self::$topicIndex[$oldTopic][$connectionId])) {
unset(self::$topicIndex[$oldTopic][$connectionId]);
if (self::$topicIndex[$oldTopic] === []) {
unset(self::$topicIndex[$oldTopic]);
}
}
}
$clean = [];
foreach ($topics as $t) {
if (!is_string($t)) {
continue;
}
$v = trim($t);
if ($v === '' || strlen($v) > 64) {
continue;
}
$clean[$v] = true;
}
$finalTopics = array_keys($clean);
sort($finalTopics);
self::$connectionMeta[$connectionId]['topics'] = $finalTopics;
foreach ($finalTopics as $topic) {
self::$topicIndex[$topic][$connectionId] = true;
}
return $finalTopics;
}
/**
* 获取订阅了指定 topic 的所有 connection_id。
*
* @return list<int>
*/
public static function connectionsForTopic(string $topic): array
{
$topic = trim($topic);
if ($topic === '' || !isset(self::$topicIndex[$topic])) {
return [];
}
return array_keys(self::$topicIndex[$topic]);
}
/**
* 标记连接活跃时间(接收任意消息时调用,用于心跳超时判断)。
*/
public static function touch(int $connectionId): void
{
if (isset(self::$connectionMeta[$connectionId])) {
self::$connectionMeta[$connectionId]['last_seen_at'] = time();
}
}
/**
* @return array{topics: list<string>, user_id: int, last_seen_at: int, remote_ip: string}|null
*/
public static function meta(int $connectionId): ?array
{
return self::$connectionMeta[$connectionId] ?? null;
}
public static function userIdOf(int $connectionId): int
{
return self::$connectionMeta[$connectionId]['user_id'] ?? 0;
}
public static function isAdmin(int $connectionId): bool
{
return (self::$connectionMeta[$connectionId]['mode'] ?? '') === 'admin';
}
/**
* 找出所有 last_seen_at 早于 $cutoff 的连接 id用于服务端主动关闭僵尸连接
*
* @return list<int>
*/
public static function staleConnections(int $cutoff): array
{
$out = [];
foreach (self::$connectionMeta as $cid => $meta) {
if (($meta['last_seen_at'] ?? 0) < $cutoff) {
$out[] = $cid;
}
}
return $out;
}
/**
* 当前活跃连接数(运维/诊断用)。
*/
public static function connectionCount(): int
{
return count(self::$connectionMeta);
}
/**
* 当前活跃订阅总数(运维/诊断用)。
*/
public static function subscriptionCount(): int
{
$sum = 0;
foreach (self::$topicIndex as $conns) {
$sum += count($conns);
}
return $sum;
}
/**
* 仅供测试/进程重启时清空索引。
*/
public static function reset(): void
{
self::$topicIndex = [];
self::$connectionMeta = [];
}
}