$data * @return bool 是否成功入队(false 表示 Redis 不可用或参数非法,调用方应避免标记“已推送”) */ public static function publish(string $topic, array $data): bool { $topic = trim($topic); if ($topic === '') { return false; } $payload = [ 'topic' => $topic, 'event' => $topic, 'data' => $data, 'server_time' => time(), ]; $json = json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); if (!is_string($json) || $json === '') { return false; } try { $len = Redis::lPush(self::KEY_QUEUE, $json); return is_numeric($len) && (int) $len > 0; } catch (Throwable $e) { Log::warning('ws event bus publish failed', [ 'topic' => $topic, 'error' => $e->getMessage(), ]); return false; } } /** * @return list,server_time:int}> */ public static function popBatch(int $limit = self::MAX_BATCH): array { if ($limit <= 0) { return []; } if ($limit > self::MAX_BATCH) { $limit = self::MAX_BATCH; } $out = []; try { for ($i = 0; $i < $limit; $i++) { $raw = Redis::rPop(self::KEY_QUEUE); if (!is_string($raw) || $raw === '') { break; } $decoded = json_decode($raw, true); if (!is_array($decoded)) { continue; } $topicRaw = $decoded['topic'] ?? ''; $eventRaw = $decoded['event'] ?? ''; $dataRaw = $decoded['data'] ?? []; $serverTimeRaw = $decoded['server_time'] ?? time(); if (!is_string($topicRaw) || trim($topicRaw) === '') { continue; } $topic = trim($topicRaw); $event = is_string($eventRaw) && trim($eventRaw) !== '' ? trim($eventRaw) : $topic; $data = is_array($dataRaw) ? $dataRaw : []; $serverTime = filter_var($serverTimeRaw, FILTER_VALIDATE_INT); if ($serverTime === false || $serverTime <= 0) { $serverTime = time(); } $out[] = [ 'topic' => $topic, 'event' => $event, 'data' => $data, 'server_time' => $serverTime, ]; } } catch (Throwable) { return $out; } return $out; } }