Files
webman-buildadmin/app/common/service/GameWebSocketEventBus.php
2026-05-27 10:28:39 +08:00

143 lines
4.6 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace app\common\service;
use support\Log;
use support\Redis;
use Throwable;
/**
* 通过 Redis 列表在不同进程间投递 WebSocket 事件。
*
* 入队失败统一返回 false 并写 ws 日志runtime/logs/ws.log便于排查"为什么没有推送"。
*/
final class GameWebSocketEventBus
{
private const KEY_QUEUE = 'dfw:v1:ws:event:queue';
private const MAX_BATCH = 100;
/**
* @param array<string, mixed> $data
* @return bool 是否成功入队false 表示 Redis 不可用或参数非法,调用方应避免标记"已推送"
*/
public static function publish(string $topic, array $data): bool
{
$topic = trim($topic);
if ($topic === '') {
return false;
}
$payload = [
'topic' => $topic,
'event' => $topic,
'data' => $data,
'server_time' => time(),
];
$json = json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
if (!is_string($json) || $json === '') {
Log::channel('ws')->warning('publish skip: invalid json payload', [
'topic' => $topic,
]);
return false;
}
try {
$len = Redis::lPush(self::KEY_QUEUE, $json);
$ok = is_numeric($len) && (int) $len > 0;
if ($ok) {
$uid = filter_var($data['user_id'] ?? 0, FILTER_VALIDATE_INT);
Log::channel('ws')->info('publish', [
'topic' => $topic,
'user_id' => $uid === false ? 0 : (int) $uid,
'queue_len_after' => (int) $len,
'payload_size' => strlen($json),
]);
} else {
Log::channel('ws')->warning('publish lpush returned non-positive', [
'topic' => $topic,
'returned' => $len,
]);
}
return $ok;
} catch (Throwable $e) {
Log::channel('ws')->error('publish failed (redis exception)', [
'topic' => $topic,
'error' => $e->getMessage(),
]);
Log::warning('ws event bus publish failed', [
'topic' => $topic,
'error' => $e->getMessage(),
]);
return false;
}
}
/**
* @return list<array{topic:string,event:string,data:array<string,mixed>,server_time:int}>
*/
public static function popBatch(int $limit = self::MAX_BATCH): array
{
if ($limit <= 0) {
return [];
}
if ($limit > self::MAX_BATCH) {
$limit = self::MAX_BATCH;
}
$out = [];
try {
for ($i = 0; $i < $limit; $i++) {
$raw = Redis::rPop(self::KEY_QUEUE);
if (!is_string($raw) || $raw === '') {
break;
}
$decoded = json_decode($raw, true);
if (!is_array($decoded)) {
continue;
}
$topicRaw = $decoded['topic'] ?? '';
$eventRaw = $decoded['event'] ?? '';
$dataRaw = $decoded['data'] ?? [];
$serverTimeRaw = $decoded['server_time'] ?? time();
if (!is_string($topicRaw) || trim($topicRaw) === '') {
continue;
}
$topic = trim($topicRaw);
$event = is_string($eventRaw) && trim($eventRaw) !== '' ? trim($eventRaw) : $topic;
$data = is_array($dataRaw) ? $dataRaw : [];
$serverTime = filter_var($serverTimeRaw, FILTER_VALIDATE_INT);
if ($serverTime === false || $serverTime <= 0) {
$serverTime = time();
}
$out[] = [
'topic' => $topic,
'event' => $event,
'data' => $data,
'server_time' => $serverTime,
];
}
} catch (Throwable $e) {
Log::channel('ws')->error('popBatch failed (redis exception)', [
'error' => $e->getMessage(),
'popped' => count($out),
]);
return $out;
}
return $out;
}
/**
* 当前队列堆积长度(监控用)。
*/
public static function queueLength(): int
{
try {
$len = Redis::lLen(self::KEY_QUEUE);
return is_numeric($len) ? (int) $len : 0;
} catch (Throwable) {
return -1;
}
}
}