143 lines
4.6 KiB
PHP
143 lines
4.6 KiB
PHP
<?php
|
||
|
||
declare(strict_types=1);
|
||
|
||
namespace app\common\service;
|
||
|
||
use support\Log;
|
||
use support\Redis;
|
||
use Throwable;
|
||
|
||
/**
|
||
* 通过 Redis 列表在不同进程间投递 WebSocket 事件。
|
||
*
|
||
* 入队失败统一返回 false 并写 ws 日志(runtime/logs/ws.log),便于排查"为什么没有推送"。
|
||
*/
|
||
final class GameWebSocketEventBus
|
||
{
|
||
private const KEY_QUEUE = 'dfw:v1:ws:event:queue';
|
||
private const MAX_BATCH = 100;
|
||
|
||
/**
|
||
* @param array<string, mixed> $data
|
||
* @return bool 是否成功入队(false 表示 Redis 不可用或参数非法,调用方应避免标记"已推送")
|
||
*/
|
||
public static function publish(string $topic, array $data): bool
|
||
{
|
||
$topic = trim($topic);
|
||
if ($topic === '') {
|
||
return false;
|
||
}
|
||
$payload = [
|
||
'topic' => $topic,
|
||
'event' => $topic,
|
||
'data' => $data,
|
||
'server_time' => time(),
|
||
];
|
||
$json = json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
|
||
if (!is_string($json) || $json === '') {
|
||
Log::channel('ws')->warning('publish skip: invalid json payload', [
|
||
'topic' => $topic,
|
||
]);
|
||
return false;
|
||
}
|
||
try {
|
||
$len = Redis::lPush(self::KEY_QUEUE, $json);
|
||
$ok = is_numeric($len) && (int) $len > 0;
|
||
if ($ok) {
|
||
$uid = filter_var($data['user_id'] ?? 0, FILTER_VALIDATE_INT);
|
||
Log::channel('ws')->info('publish', [
|
||
'topic' => $topic,
|
||
'user_id' => $uid === false ? 0 : (int) $uid,
|
||
'queue_len_after' => (int) $len,
|
||
'payload_size' => strlen($json),
|
||
]);
|
||
} else {
|
||
Log::channel('ws')->warning('publish lpush returned non-positive', [
|
||
'topic' => $topic,
|
||
'returned' => $len,
|
||
]);
|
||
}
|
||
return $ok;
|
||
} catch (Throwable $e) {
|
||
Log::channel('ws')->error('publish failed (redis exception)', [
|
||
'topic' => $topic,
|
||
'error' => $e->getMessage(),
|
||
]);
|
||
Log::warning('ws event bus publish failed', [
|
||
'topic' => $topic,
|
||
'error' => $e->getMessage(),
|
||
]);
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* @return list<array{topic:string,event:string,data:array<string,mixed>,server_time:int}>
|
||
*/
|
||
public static function popBatch(int $limit = self::MAX_BATCH): array
|
||
{
|
||
if ($limit <= 0) {
|
||
return [];
|
||
}
|
||
if ($limit > self::MAX_BATCH) {
|
||
$limit = self::MAX_BATCH;
|
||
}
|
||
|
||
$out = [];
|
||
try {
|
||
for ($i = 0; $i < $limit; $i++) {
|
||
$raw = Redis::rPop(self::KEY_QUEUE);
|
||
if (!is_string($raw) || $raw === '') {
|
||
break;
|
||
}
|
||
$decoded = json_decode($raw, true);
|
||
if (!is_array($decoded)) {
|
||
continue;
|
||
}
|
||
$topicRaw = $decoded['topic'] ?? '';
|
||
$eventRaw = $decoded['event'] ?? '';
|
||
$dataRaw = $decoded['data'] ?? [];
|
||
$serverTimeRaw = $decoded['server_time'] ?? time();
|
||
if (!is_string($topicRaw) || trim($topicRaw) === '') {
|
||
continue;
|
||
}
|
||
$topic = trim($topicRaw);
|
||
$event = is_string($eventRaw) && trim($eventRaw) !== '' ? trim($eventRaw) : $topic;
|
||
$data = is_array($dataRaw) ? $dataRaw : [];
|
||
$serverTime = filter_var($serverTimeRaw, FILTER_VALIDATE_INT);
|
||
if ($serverTime === false || $serverTime <= 0) {
|
||
$serverTime = time();
|
||
}
|
||
$out[] = [
|
||
'topic' => $topic,
|
||
'event' => $event,
|
||
'data' => $data,
|
||
'server_time' => $serverTime,
|
||
];
|
||
}
|
||
} catch (Throwable $e) {
|
||
Log::channel('ws')->error('popBatch failed (redis exception)', [
|
||
'error' => $e->getMessage(),
|
||
'popped' => count($out),
|
||
]);
|
||
return $out;
|
||
}
|
||
|
||
return $out;
|
||
}
|
||
|
||
/**
|
||
* 当前队列堆积长度(监控用)。
|
||
*/
|
||
public static function queueLength(): int
|
||
{
|
||
try {
|
||
$len = Redis::lLen(self::KEY_QUEUE);
|
||
return is_numeric($len) ? (int) $len : 0;
|
||
} catch (Throwable) {
|
||
return -1;
|
||
}
|
||
}
|
||
}
|