*/ private const USER_SCOPED_TOPICS = [ 'bet.win', 'user.streak', 'wallet.changed', 'bet.accepted', 'auto.spin.progress', ]; /** * 分发单条事件到所有命中的连接。 * * @param array{topic:string, event:string, data:array, server_time:int} $event * @param array $connections connection_id => TcpConnection */ public static function dispatch(array $event, array $connections): void { $topic = $event['topic'] ?? ''; if (!is_string($topic) || $topic === '') { return; } $candidateIds = GameWebSocketSubscriptionRegistry::connectionsForTopic($topic); if ($candidateIds === []) { Log::channel('ws')->debug('dispatch skip: no subscriber', [ 'topic' => $topic, 'queue_server_time' => $event['server_time'] ?? 0, ]); return; } $userScoped = in_array($topic, self::USER_SCOPED_TOPICS, true); $payloadUserId = 0; if ($userScoped) { $raw = $event['data']['user_id'] ?? 0; $parsed = filter_var($raw, FILTER_VALIDATE_INT); $payloadUserId = $parsed === false ? 0 : (int) $parsed; } $rawData = is_array($event['data'] ?? null) ? $event['data'] : []; $clientData = GameWebSocketPayloadHelper::sanitizeOutboundData($rawData); $frame = json_encode([ 'event' => $event['event'] ?? $topic, 'topic' => $topic, 'data' => $clientData, 'server_time' => $event['server_time'] ?? time(), ], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); if (!is_string($frame) || $frame === '') { Log::channel('ws')->warning('dispatch skip: invalid json frame', [ 'topic' => $topic, ]); return; } $matched = 0; $skippedNotOwner = 0; $skippedClosed = 0; $sendFailed = 0; foreach ($candidateIds as $cid) { if (!isset($connections[$cid])) { $skippedClosed++; continue; } if ($userScoped && $payloadUserId > 0) { $boundUid = GameWebSocketSubscriptionRegistry::userIdOf($cid); if ($boundUid !== $payloadUserId) { $skippedNotOwner++; continue; } } try { $connections[$cid]->send($frame); $matched++; } catch (Throwable $e) { $sendFailed++; Log::channel('ws')->warning('dispatch send failed', [ 'topic' => $topic, 'connection_id' => $cid, 'error' => $e->getMessage(), ]); } } Log::channel('ws')->info('dispatch', [ 'topic' => $topic, 'user_scoped' => $userScoped, 'payload_user_id' => $payloadUserId, 'candidates' => count($candidateIds), 'matched' => $matched, 'skipped_not_owner' => $skippedNotOwner, 'skipped_closed' => $skippedClosed, 'send_failed' => $sendFailed, 'frame_size' => strlen($frame), ]); } /** * 直接向某连接下发单帧(握手回执 / 订阅回执 / pong / 演示帧)。 */ public static function sendDirect(TcpConnection $connection, string $event, array $data, string $tag = ''): void { $controlEvents = ['ws.connected', 'ws.subscribed', 'ws.error', 'pong']; $payload = $data; if (!in_array($event, $controlEvents, true)) { if (isset($payload['data']) && is_array($payload['data'])) { $payload['data'] = GameWebSocketPayloadHelper::sanitizeOutboundData($payload['data']); } } if ($event === 'ws.connected') { unset($payload['user_id']); } $frame = json_encode(array_merge([ 'event' => $event, ], $payload), JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); if (!is_string($frame) || $frame === '') { return; } try { $connection->send($frame); Log::channel('ws')->debug('direct send', [ 'connection_id' => $connection->id, 'event' => $event, 'tag' => $tag, 'frame_size' => strlen($frame), ]); } catch (Throwable $e) { Log::channel('ws')->warning('direct send failed', [ 'connection_id' => $connection->id, 'event' => $event, 'tag' => $tag, 'error' => $e->getMessage(), ]); } } }