1.ws优化订阅,修复中大奖没有推送

This commit is contained in:
2026-05-27 10:40:55 +08:00
parent 8f5ba977a4
commit a96aa0fb41
3 changed files with 241 additions and 13 deletions

View File

@@ -33,6 +33,11 @@ final class GameLiveService
/** period.tick 边界帧去重finished / void 每期只推一次TTL 兼顾跨进程与跨期重启 */
private const TICK_BOUNDARY_DEDUP_KEY_PREFIX = 'dfw:v1:ws:tick:boundary:';
private const TICK_BOUNDARY_DEDUP_TTL_SECONDS = 300;
/** 开奖/派彩阶段公共推送去重(与 bet.win 分离recover 补偿路径也会写) */
private const PERIOD_OPENED_NOTIFY_DEDUP_PREFIX = 'dfw:v1:ws:period_opened:';
private const PERIOD_PAYOUT_NOTIFY_DEDUP_PREFIX = 'dfw:v1:ws:period_payout:';
private const PERIOD_DRAW_NOTIFY_DEDUP_TTL = 86400;
private const KEY_PERIOD_SECONDS = 'period_seconds';
private const KEY_BET_SECONDS = 'bet_seconds';
private const KEY_PAYOUT_SECONDS = 'payout_seconds';
@@ -156,13 +161,23 @@ final class GameLiveService
return;
}
$periodNo = is_string($row['period_no'] ?? null) ? (string) $row['period_no'] : '';
$drawMode = filter_var($row['draw_mode'] ?? 0, FILTER_VALIDATE_INT);
GameBetSettleService::publishSettlementWinsAfterCommit(
$settleOut,
$recordId,
is_string($row['period_no'] ?? null) ? (string) $row['period_no'] : '',
$periodNo,
(int) $resultNumber
);
GameBetSettleService::ensurePeriodBetWinNotifications($recordId, (int) $resultNumber);
self::ensurePeriodDrawNotifications(
$recordId,
$periodNo,
(int) $resultNumber,
$drawMode === false ? 0 : $drawMode,
$payoutUntil > 0 ? $payoutUntil : $now + self::getPayoutGraceSeconds(),
$now
);
GameHotDataCoordinator::afterGameRecordCommitted($recordId);
self::publishSnapshot(null);
@@ -750,6 +765,16 @@ final class GameLiveService
$payoutUntil,
$now
): void {
// 开奖号已写入 DB先推 period.opened / period.payout再结算避免 settle 异常导致永远收不到开奖推送)
self::ensurePeriodDrawNotifications($rid, $periodNo, $finalNumber, $drawMode, $payoutUntil, $now);
$settleOut = [
'jackpot_hits' => [],
'bet_wins' => [],
'user_streak_events' => [],
'wallet_events' => [],
'settled_order_count' => 0,
];
try {
$settleOut = GameBetSettleService::settleBetsForDraw($rid, $finalNumber);
} catch (Throwable $e) {
@@ -757,8 +782,6 @@ final class GameLiveService
'record_id' => $rid,
'error' => $e->getMessage(),
]);
return;
}
GameBetSettleService::publishSettlementWinsAfterCommit(
$settleOut,
@@ -766,13 +789,12 @@ final class GameLiveService
$periodNo,
$finalNumber
);
GameBetSettleService::ensurePeriodBetWinNotifications($rid, $finalNumber);
GameHotDataCoordinator::afterGameRecordCommitted($rid);
try {
GameRecordStatService::refreshForRecordId($rid);
} catch (Throwable) {
}
self::publishPublicPeriodOpened($periodNo, $finalNumber, $drawMode, $now);
self::publishPublicPeriodPayout($rid, $periodNo, $finalNumber, $payoutUntil, $now);
$jackpotHits = is_array($settleOut['jackpot_hits'] ?? null) ? $settleOut['jackpot_hits'] : [];
GameWebSocketEventBus::publish('admin.live.opened', [
'period_id' => $rid,
@@ -1550,6 +1572,92 @@ final class GameLiveService
]);
}
/**
* 开奖后推送 period.opened + period.payout每期各至多一次入队成功后才写 dedup
* recover / drawResult 补偿路径均会调用,避免仅 recover 结算时漏推 period.opened。
*/
public static function ensurePeriodDrawNotifications(
int $periodId,
string $periodNo,
int $resultNumber,
int $drawMode,
int $payoutUntil,
int $openTime
): void {
if ($periodId <= 0 || $periodNo === '' || $resultNumber < 1) {
return;
}
$openedKey = self::PERIOD_OPENED_NOTIFY_DEDUP_PREFIX . $periodId;
$payoutKey = self::PERIOD_PAYOUT_NOTIFY_DEDUP_PREFIX . $periodId;
if (!self::hasPeriodNotifyMarked($openedKey)) {
$ok = GameWebSocketEventBus::publish(self::EVT_PERIOD_OPENED, [
'period_no' => $periodNo,
'result_number' => $resultNumber,
'draw_mode' => $drawMode,
'open_time' => $openTime,
]);
if ($ok) {
self::markPeriodNotifyOnce($openedKey);
Log::channel('ws')->info('period.opened published', [
'period_id' => $periodId,
'period_no' => $periodNo,
'result_number' => $resultNumber,
]);
} else {
Log::channel('ws')->warning('period.opened publish failed', [
'period_id' => $periodId,
'period_no' => $periodNo,
]);
}
}
if (!self::hasPeriodNotifyMarked($payoutKey)) {
$grace = self::getPayoutGraceSeconds();
$remaining = max(0, $payoutUntil - $openTime);
$ok = GameWebSocketEventBus::publish(self::EVT_PERIOD_PAYOUT, [
'period_id' => $periodId,
'period_no' => $periodNo,
'result_number' => $resultNumber,
'payout_until' => $payoutUntil,
'payout_seconds' => $grace,
'payout_remaining_seconds' => $remaining,
'server_time' => $openTime,
]);
if ($ok) {
self::markPeriodNotifyOnce($payoutKey);
Log::channel('ws')->info('period.payout published', [
'period_id' => $periodId,
'period_no' => $periodNo,
]);
} else {
Log::channel('ws')->warning('period.payout publish failed', [
'period_id' => $periodId,
'period_no' => $periodNo,
]);
}
}
}
private static function hasPeriodNotifyMarked(string $key): bool
{
try {
$existing = Redis::get($key);
return $existing !== false && $existing !== null && $existing !== '';
} catch (Throwable) {
return false;
}
}
private static function markPeriodNotifyOnce(string $key): void
{
try {
Redis::setEx($key, self::PERIOD_DRAW_NOTIFY_DEDUP_TTL, '1');
} catch (Throwable) {
}
}
private static function publishPublicPeriodOpened(string $periodNo, int $resultNumber, int $drawMode, int $openTime): void
{
GameWebSocketEventBus::publish(self::EVT_PERIOD_OPENED, [

View File

@@ -114,27 +114,43 @@ foreach ($records as $gr) {
continue;
}
echo " -- bet.win dedup keys --\n";
foreach (array_keys($winUserIds) as $uid) {
$key = 'dfw:v1:ws:betwin:' . $pid . ':' . $uid;
echo " -- draw / bet.win dedup keys --\n";
foreach (['dfw:v1:ws:period_opened:' . $pid => 'period.opened', 'dfw:v1:ws:period_payout:' . $pid => 'period.payout'] as $key => $label) {
try {
$val = Redis::get($key);
$ttl = Redis::ttl($key);
} catch (\Throwable $e) {
} catch (\Throwable) {
$val = false;
$ttl = -2;
}
$exists = ($val !== false && $val !== null && $val !== '');
echo sprintf(
" %s user=%d exists=%s ttl=%s %s\n",
$key,
" %s exists=%s ttl=%s %s\n",
$label,
$exists ? 'YES' : 'NO',
(string) $ttl,
$exists ? '(已推送)' : '(未推送)'
);
}
foreach (array_keys($winUserIds) as $uid) {
$key = 'dfw:v1:ws:betwin:' . $pid . ':' . $uid;
try {
$val = Redis::get($key);
$ttl = Redis::ttl($key);
} catch (\Throwable) {
$val = false;
$ttl = -2;
}
$exists = ($val !== false && $val !== null && $val !== '');
echo sprintf(
" bet.win user=%d exists=%s ttl=%s %s\n",
$uid,
$exists ? 'YES' : 'NO',
(string) $ttl,
$exists ? '(已推送)' : '(未推送 → 重启 webman 后用 republish_bet_win.php 补发)'
$exists ? '(已推送)' : '(未推送)'
);
}
echo "\n";
}
echo "如需补发php scripts/republish_bet_win.php --period-no=<no> --force\n";
echo "补发php scripts/republish_period_draw.php --period-no=<no> --force\n";

View File

@@ -0,0 +1,104 @@
<?php
declare(strict_types=1);
/**
* 补发本期开奖推送period.opened + period.payout + bet.win已结算中奖用户
*
* 用法(在生产服务器执行,需 Redis 与 gameWebSocketServer 同机):
* php scripts/republish_period_draw.php --period-no=20260527-103109-7fbbbb5e
* php scripts/republish_period_draw.php --play-record-id=1388
* php scripts/republish_period_draw.php --period-id=74992 --force
*/
require_once __DIR__ . '/../vendor/autoload.php';
require_once __DIR__ . '/../support/bootstrap.php';
use app\common\service\GameBetSettleService;
use app\common\service\GameLiveService;
use support\Redis;
use support\think\Db;
$opts = getopt('', ['period-id:', 'play-record-id:', 'period-no:', 'force']);
$force = array_key_exists('force', $opts);
$periodId = isset($opts['period-id']) ? (int) $opts['period-id'] : 0;
if (isset($opts['play-record-id'])) {
$playId = (int) $opts['play-record-id'];
if ($playId > 0) {
$pid = Db::name('game_play_record')->where('id', $playId)->value('period_id');
$periodId = is_numeric((string) $pid) ? (int) $pid : 0;
echo "play_record_id={$playId} => period_id={$periodId}" . PHP_EOL;
}
}
if ($periodId <= 0 && isset($opts['period-no'])) {
$periodNo = trim((string) $opts['period-no']);
if ($periodNo !== '') {
$pid = Db::name('game_record')->where('period_no', $periodNo)->value('id');
$periodId = is_numeric((string) $pid) ? (int) $pid : 0;
echo "period_no={$periodNo} => period_id={$periodId}" . PHP_EOL;
}
}
if ($periodId <= 0) {
fwrite(STDERR, "请指定 --period-id、--play-record-id 或 --period-no\n");
exit(1);
}
$row = Db::name('game_record')->where('id', $periodId)->find();
if (!is_array($row)) {
fwrite(STDERR, "对局不存在: period_id={$periodId}\n");
exit(1);
}
$resultNumber = filter_var($row['result_number'] ?? 0, FILTER_VALIDATE_INT);
if ($resultNumber === false || $resultNumber < 1) {
fwrite(STDERR, "对局尚未开奖,无法补发\n");
exit(1);
}
$periodNo = (string) ($row['period_no'] ?? '');
$drawMode = filter_var($row['draw_mode'] ?? 0, FILTER_VALIDATE_INT);
$payoutUntil = filter_var($row['payout_until'] ?? 0, FILTER_VALIDATE_INT);
$openTime = filter_var($row['update_time'] ?? time(), FILTER_VALIDATE_INT);
if ($openTime === false || $openTime <= 0) {
$openTime = time();
}
if ($force) {
try {
Redis::del('dfw:v1:ws:period_opened:' . $periodId);
Redis::del('dfw:v1:ws:period_payout:' . $periodId);
} catch (Throwable) {
}
$payloads = GameBetSettleService::buildBetWinPayloadsFromSettledOrders($periodId, $resultNumber);
foreach ($payloads as $payload) {
$uid = (int) ($payload['user_id'] ?? 0);
if ($uid > 0) {
try {
Redis::del('dfw:v1:ws:betwin:' . $periodId . ':' . $uid);
} catch (Throwable) {
}
}
}
echo "已清除 dedup 键(--force\n";
}
GameLiveService::ensurePeriodDrawNotifications(
$periodId,
$periodNo,
$resultNumber,
$drawMode === false ? 0 : $drawMode,
$payoutUntil !== false && $payoutUntil > 0 ? $payoutUntil : time(),
$openTime
);
$payloads = GameBetSettleService::buildBetWinPayloadsFromSettledOrders($periodId, $resultNumber);
if ($payloads !== []) {
GameBetSettleService::publishBetWinsAfterCommit($payloads, $periodId);
echo 'bet.win republished for user_ids: ' . implode(',', array_map(static fn (array $p): int => (int) ($p['user_id'] ?? 0), $payloads)) . PHP_EOL;
} else {
echo "本期无已结算中奖注单。\n";
}
echo "done period_id={$periodId} period_no={$periodNo} result={$resultNumber}\n";