map * - connection => [ topics: list, user_id: int, last_seen_at: int, remote_ip: string ] * - 分发器按 (topic) 直接拿到候选连接列表,按 (data.user_id) 过滤后再 send,避免 O(N) 全遍历。 * - **必须** 与 GameWebSocketServer 同进程使用:count=1,不可水平扩展(多 worker 间无法共享连接)。 * * 该类不持有 TcpConnection 引用,仅持有 connection_id 与元数据;Server 维护 connection_id => TcpConnection 映射。 */ final class GameWebSocketSubscriptionRegistry { /** @var array> topic => { connection_id: true } */ private static array $topicIndex = []; /** @var array, user_id: int, mode: string, last_seen_at: int, remote_ip: string}> */ private static array $connectionMeta = []; /** * 注册新连接(onConnect 调用)。 */ public static function registerConnection(int $connectionId, int $userId, string $mode = 'mobile', string $remoteIp = ''): void { if ($connectionId <= 0) { return; } $mode = trim($mode); if ($mode !== 'admin') { $mode = 'mobile'; } self::$connectionMeta[$connectionId] = [ 'topics' => [], 'user_id' => max(0, $userId), 'mode' => $mode, 'last_seen_at' => time(), 'remote_ip' => $remoteIp, ]; } /** * 注销连接(onClose 调用):从所有 topic 索引中移除该 connection。 */ public static function unregisterConnection(int $connectionId): void { if ($connectionId <= 0 || !isset(self::$connectionMeta[$connectionId])) { return; } foreach (self::$connectionMeta[$connectionId]['topics'] as $topic) { if (isset(self::$topicIndex[$topic][$connectionId])) { unset(self::$topicIndex[$topic][$connectionId]); if (self::$topicIndex[$topic] === []) { unset(self::$topicIndex[$topic]); } } } unset(self::$connectionMeta[$connectionId]); } /** * 替换该连接的订阅列表(subscribe 报文调用,覆盖式订阅,符合现网协议)。 * * @param list $topics * @return list 实际生效的、去重排序后的订阅列表 */ public static function replaceSubscriptions(int $connectionId, array $topics): array { if ($connectionId <= 0 || !isset(self::$connectionMeta[$connectionId])) { return []; } foreach (self::$connectionMeta[$connectionId]['topics'] as $oldTopic) { if (isset(self::$topicIndex[$oldTopic][$connectionId])) { unset(self::$topicIndex[$oldTopic][$connectionId]); if (self::$topicIndex[$oldTopic] === []) { unset(self::$topicIndex[$oldTopic]); } } } $clean = []; foreach ($topics as $t) { if (!is_string($t)) { continue; } $v = trim($t); if ($v === '' || strlen($v) > 64) { continue; } $clean[$v] = true; } $finalTopics = array_keys($clean); sort($finalTopics); self::$connectionMeta[$connectionId]['topics'] = $finalTopics; foreach ($finalTopics as $topic) { self::$topicIndex[$topic][$connectionId] = true; } return $finalTopics; } /** * 增量订阅:将 topics 合并到现有订阅集合(不会移除旧 topic)。 * * 兼容部分客户端“多次 subscribe 但只携带增量 topic”的行为,避免后续误覆盖导致 bet.win/jackpot.hit 丢订阅。 * * @param list $topics * @return list 合并后的订阅列表(去重排序) */ public static function mergeSubscriptions(int $connectionId, array $topics): array { if ($connectionId <= 0 || !isset(self::$connectionMeta[$connectionId])) { return []; } $clean = []; foreach ($topics as $t) { if (!is_string($t)) { continue; } $v = trim($t); if ($v === '' || strlen($v) > 64) { continue; } $clean[$v] = true; } if ($clean === []) { return self::$connectionMeta[$connectionId]['topics']; } $existing = self::$connectionMeta[$connectionId]['topics']; $mergedMap = []; foreach ($existing as $t) { $mergedMap[$t] = true; } foreach (array_keys($clean) as $t) { $mergedMap[$t] = true; } $finalTopics = array_keys($mergedMap); sort($finalTopics); // 只需要把新增 topic 写入 topicIndex(旧 topic 已存在索引) $existingMap = []; foreach ($existing as $t) { $existingMap[$t] = true; } foreach ($finalTopics as $topic) { if (!isset($existingMap[$topic])) { self::$topicIndex[$topic][$connectionId] = true; } } self::$connectionMeta[$connectionId]['topics'] = $finalTopics; return $finalTopics; } /** * 获取订阅了指定 topic 的所有 connection_id。 * * @return list */ public static function connectionsForTopic(string $topic): array { $topic = trim($topic); if ($topic === '' || !isset(self::$topicIndex[$topic])) { return []; } return array_keys(self::$topicIndex[$topic]); } /** * 标记连接活跃时间(接收任意消息时调用,用于心跳超时判断)。 */ public static function touch(int $connectionId): void { if (isset(self::$connectionMeta[$connectionId])) { self::$connectionMeta[$connectionId]['last_seen_at'] = time(); } } /** * @return array{topics: list, user_id: int, last_seen_at: int, remote_ip: string}|null */ public static function meta(int $connectionId): ?array { return self::$connectionMeta[$connectionId] ?? null; } public static function userIdOf(int $connectionId): int { return self::$connectionMeta[$connectionId]['user_id'] ?? 0; } public static function isAdmin(int $connectionId): bool { return (self::$connectionMeta[$connectionId]['mode'] ?? '') === 'admin'; } /** * 找出所有 last_seen_at 早于 $cutoff 的连接 id(用于服务端主动关闭僵尸连接)。 * * @return list */ public static function staleConnections(int $cutoff): array { $out = []; foreach (self::$connectionMeta as $cid => $meta) { if (($meta['last_seen_at'] ?? 0) < $cutoff) { $out[] = $cid; } } return $out; } /** * 当前活跃连接数(运维/诊断用)。 */ public static function connectionCount(): int { return count(self::$connectionMeta); } /** * 当前活跃订阅总数(运维/诊断用)。 */ public static function subscriptionCount(): int { $sum = 0; foreach (self::$topicIndex as $conns) { $sum += count($conns); } return $sum; } /** * 仅供测试/进程重启时清空索引。 */ public static function reset(): void { self::$topicIndex = []; self::$connectionMeta = []; } }