$data * @return bool 是否成功入队(false 表示 Redis 不可用或参数非法,调用方应避免标记"已推送") */ public static function publish(string $topic, array $data): bool { $topic = trim($topic); if ($topic === '') { return false; } $payload = [ 'topic' => $topic, 'event' => $topic, 'data' => $data, 'server_time' => time(), ]; $json = json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); if (!is_string($json) || $json === '') { Log::channel('ws')->warning('publish skip: invalid json payload', [ 'topic' => $topic, ]); return false; } try { $len = Redis::lPush(self::KEY_QUEUE, $json); $ok = is_numeric($len) && (int) $len > 0; if ($ok) { $uid = filter_var($data['user_id'] ?? 0, FILTER_VALIDATE_INT); Log::channel('ws')->info('publish', [ 'topic' => $topic, 'user_id' => $uid === false ? 0 : (int) $uid, 'queue_len_after' => (int) $len, 'payload_size' => strlen($json), ]); } else { Log::channel('ws')->warning('publish lpush returned non-positive', [ 'topic' => $topic, 'returned' => $len, ]); } return $ok; } catch (Throwable $e) { Log::channel('ws')->error('publish failed (redis exception)', [ 'topic' => $topic, 'error' => $e->getMessage(), ]); Log::warning('ws event bus publish failed', [ 'topic' => $topic, 'error' => $e->getMessage(), ]); return false; } } /** * @return list,server_time:int}> */ public static function popBatch(int $limit = self::MAX_BATCH): array { if ($limit <= 0) { return []; } if ($limit > self::MAX_BATCH) { $limit = self::MAX_BATCH; } $out = []; try { for ($i = 0; $i < $limit; $i++) { $raw = Redis::rPop(self::KEY_QUEUE); if (!is_string($raw) || $raw === '') { break; } $decoded = json_decode($raw, true); if (!is_array($decoded)) { continue; } $topicRaw = $decoded['topic'] ?? ''; $eventRaw = $decoded['event'] ?? ''; $dataRaw = $decoded['data'] ?? []; $serverTimeRaw = $decoded['server_time'] ?? time(); if (!is_string($topicRaw) || trim($topicRaw) === '') { continue; } $topic = trim($topicRaw); $event = is_string($eventRaw) && trim($eventRaw) !== '' ? trim($eventRaw) : $topic; $data = is_array($dataRaw) ? $dataRaw : []; $serverTime = filter_var($serverTimeRaw, FILTER_VALIDATE_INT); if ($serverTime === false || $serverTime <= 0) { $serverTime = time(); } $out[] = [ 'topic' => $topic, 'event' => $event, 'data' => $data, 'server_time' => $serverTime, ]; } } catch (Throwable $e) { Log::channel('ws')->error('popBatch failed (redis exception)', [ 'error' => $e->getMessage(), 'popped' => count($out), ]); return $out; } return $out; } /** * 当前队列堆积长度(监控用)。 */ public static function queueLength(): int { try { $len = Redis::lLen(self::KEY_QUEUE); return is_numeric($len) ? (int) $len : 0; } catch (Throwable) { return -1; } } }