$data */ public static function publish(string $topic, array $data): void { $topic = trim($topic); if ($topic === '') { return; } $payload = [ 'topic' => $topic, 'event' => $topic, 'data' => $data, 'server_time' => time(), ]; $json = json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); if (!is_string($json) || $json === '') { return; } try { Redis::lPush(self::KEY_QUEUE, $json); } catch (Throwable) { } } /** * @return list,server_time:int}> */ public static function popBatch(int $limit = self::MAX_BATCH): array { if ($limit <= 0) { return []; } if ($limit > self::MAX_BATCH) { $limit = self::MAX_BATCH; } $out = []; try { for ($i = 0; $i < $limit; $i++) { $raw = Redis::rPop(self::KEY_QUEUE); if (!is_string($raw) || $raw === '') { break; } $decoded = json_decode($raw, true); if (!is_array($decoded)) { continue; } $topicRaw = $decoded['topic'] ?? ''; $eventRaw = $decoded['event'] ?? ''; $dataRaw = $decoded['data'] ?? []; $serverTimeRaw = $decoded['server_time'] ?? time(); if (!is_string($topicRaw) || trim($topicRaw) === '') { continue; } $topic = trim($topicRaw); $event = is_string($eventRaw) && trim($eventRaw) !== '' ? trim($eventRaw) : $topic; $data = is_array($dataRaw) ? $dataRaw : []; $serverTime = filter_var($serverTimeRaw, FILTER_VALIDATE_INT); if ($serverTime === false || $serverTime <= 0) { $serverTime = time(); } $out[] = [ 'topic' => $topic, 'event' => $event, 'data' => $data, 'server_time' => $serverTime, ]; } } catch (Throwable) { return $out; } return $out; } }