1.增加互斥锁:保证缓存和数据库数据一致性

2.增加消费队列,保证mysql数据的正常保存
This commit is contained in:
2026-04-20 14:13:48 +08:00
parent 614fb00ec4
commit 1eed3cf0f7
23 changed files with 836 additions and 255 deletions

View File

@@ -131,7 +131,7 @@ final class GameBetSettleService
'current_streak' => $next,
'update_time' => $now,
]);
GameHotDataRedis::userForget($userId);
GameHotDataCoordinator::afterUserCommitted($userId);
}
foreach ($aggregateByUser as $userId => $agg) {
@@ -273,7 +273,7 @@ final class GameBetSettleService
'bet_flow_coin' => Db::raw('bet_flow_coin + ' . $flow),
'update_time' => $now,
]);
GameHotDataRedis::userForget($userId);
GameHotDataCoordinator::afterUserCommitted($userId);
}
/**
@@ -321,7 +321,7 @@ final class GameBetSettleService
'coin' => $after,
'update_time' => $now,
]);
GameHotDataRedis::userForget($userId);
GameHotDataCoordinator::afterUserCommitted($userId);
return $after;
}

View File

@@ -0,0 +1,47 @@
<?php
declare(strict_types=1);
namespace app\common\service;
/**
* 缓存与队列统一入口:落库后先同步回源 Redis再入队幂等任务削峰。
*/
final class GameHotDataCoordinator
{
public static function afterUserCommitted(int $userId): void
{
if ($userId <= 0) {
return;
}
GameHotDataRedis::userReplaceCacheFromDb($userId);
GameHotDataWriteQueue::enqueue([
'op' => GameHotDataWriteQueue::OP_USER_REFRESH,
'id' => $userId,
]);
}
public static function afterGameConfigKeyCommitted(string $configKey): void
{
if ($configKey === '') {
return;
}
GameHotDataRedis::gameConfigReplaceFromDb($configKey);
GameHotDataWriteQueue::enqueue([
'op' => GameHotDataWriteQueue::OP_GC_REFRESH,
'key' => $configKey,
]);
}
/**
* @param int|null $recordId 有 id 时刷新该行并清除活跃/最新聚合键null 时仅清除聚合键
*/
public static function afterGameRecordCommitted(?int $recordId): void
{
GameHotDataRedis::gameRecordSyncCachesAfterDbWrite($recordId);
GameHotDataWriteQueue::enqueue([
'op' => GameHotDataWriteQueue::OP_GR_SYNC,
'id' => $recordId,
]);
}
}

View File

@@ -0,0 +1,132 @@
<?php
declare(strict_types=1);
namespace app\common\service;
use support\Redis;
use Throwable;
/**
* 热点实体互斥锁Redis SET NX EX按资源串行化写入避免多管理员/多进程同时改同一缓存键对应的数据。
*/
final class GameHotDataLock
{
public const TYPE_USER = 'user';
public const TYPE_GAME_CONFIG = 'gc';
public const TYPE_GAME_RECORD = 'gr';
private const KEY_PREFIX = 'dfw:v1:lock:mut:';
/**
* @return array{acquired: bool, token: ?string, redis_lock: bool}
*/
public static function tryAcquire(string $type, string $resourceKey): array
{
if ($resourceKey === '') {
return ['acquired' => false, 'token' => null, 'redis_lock' => false];
}
$lockKey = self::lockKey($type, $resourceKey);
$ttl = self::lockTtl();
$token = bin2hex(random_bytes(16));
try {
$client = Redis::connection()->client();
if (!is_object($client) || !method_exists($client, 'set')) {
return ['acquired' => true, 'token' => null, 'redis_lock' => false];
}
$ok = $client->set($lockKey, $token, ['nx', 'ex' => $ttl]);
if ($ok === true) {
return ['acquired' => true, 'token' => $token, 'redis_lock' => true];
}
} catch (Throwable) {
return ['acquired' => true, 'token' => null, 'redis_lock' => false];
}
return ['acquired' => false, 'token' => null, 'redis_lock' => true];
}
/**
* 带短等待的重试(毫秒),用于对局写入与 ensureAiLocked 等可能交叉的路径。
*
* @return array{acquired: bool, token: ?string, redis_lock: bool}
*/
public static function tryAcquireWithWait(string $type, string $resourceKey, int $maxWaitMs = 800): array
{
$deadline = (int) (microtime(true) * 1000) + max(0, $maxWaitMs);
while (true) {
$r = self::tryAcquire($type, $resourceKey);
if ($r['acquired']) {
return $r;
}
$now = (int) (microtime(true) * 1000);
if ($now >= $deadline) {
return $r;
}
usleep(25_000);
}
}
public static function release(string $type, string $resourceKey, ?string $token, bool $redisLock): void
{
if ($resourceKey === '' || !$redisLock || $token === null || $token === '') {
return;
}
$key = self::lockKey($type, $resourceKey);
$script = <<<'LUA'
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
end
return 0
LUA;
try {
$client = Redis::connection()->client();
if (is_object($client) && method_exists($client, 'eval')) {
$client->eval($script, [$key, $token], 1);
}
} catch (Throwable) {
}
}
/**
* @template T
* @param callable(): T $fn
* @return T|null
*/
public static function runExclusive(string $type, string $resourceKey, callable $fn): mixed
{
$lock = self::tryAcquire($type, $resourceKey);
if (!$lock['acquired']) {
return null;
}
try {
return $fn();
} finally {
self::release($type, $resourceKey, $lock['token'], $lock['redis_lock']);
}
}
public static function safeResourceKeyForConfig(string $configKey): string
{
return rtrim(strtr(base64_encode($configKey), '+/', '-_'), '=');
}
public static function lockKeyFromConfigKey(string $configKey): string
{
return self::lockKey(self::TYPE_GAME_CONFIG, self::safeResourceKeyForConfig($configKey));
}
private static function lockKey(string $type, string $resourceKey): string
{
return self::KEY_PREFIX . $type . ':' . $resourceKey;
}
private static function lockTtl(): int
{
$v = config('game_hot_cache.admin_user_mutation_lock_ttl', 30);
$n = filter_var($v, FILTER_VALIDATE_INT);
return ($n === false || $n < 5) ? 30 : $n;
}
}

View File

@@ -66,6 +66,45 @@ final class GameHotDataRedis
self::redisDel(self::KEY_GC . $configKey);
}
/**
* 按库中当前行覆盖 game_config 缓存(无行则删缓存)
*/
public static function gameConfigReplaceFromDb(string $configKey): void
{
if ($configKey === '' || !self::enabled()) {
return;
}
$row = Db::name('game_config')->where('config_key', $configKey)->find();
if (!$row) {
self::gameConfigForget($configKey);
return;
}
$ttl = self::intConfig('ttl_game_config', 86400);
self::redisSetEx(self::KEY_GC . $configKey, $ttl, json_encode($row, JSON_UNESCAPED_UNICODE));
}
/**
* 对局写入后:刷新指定 id 的行缓存,并删除「活跃局 / 最新局」聚合键以免脏读
*
* @param int|null $id 可为 null仅清聚合键
*/
public static function gameRecordSyncCachesAfterDbWrite(?int $id): void
{
if (!self::enabled()) {
return;
}
if ($id !== null && $id > 0) {
$row = Db::name('game_record')->where('id', $id)->find();
if ($row) {
$ttl = self::intConfig('ttl_game_record', 60);
self::redisSetEx(self::KEY_GR_ID . $id, $ttl, json_encode($row, JSON_UNESCAPED_UNICODE));
} else {
self::redisDel(self::KEY_GR_ID . $id);
}
}
self::redisDel(self::KEY_GR_ACTIVE, self::KEY_GR_LATEST);
}
/**
* @return array<string, mixed>|null
*/
@@ -212,6 +251,49 @@ final class GameHotDataRedis
self::redisDel(self::KEY_USER . $userId);
}
/**
* 从数据库读取最新 user 行并覆盖写入 Redis与 DB 同事务后调用,保持缓存与库一致)
*/
public static function userReplaceCacheFromDb(int $userId): void
{
if ($userId <= 0 || !self::enabled()) {
return;
}
$row = Db::name('user')->where('id', $userId)->find();
if (!$row) {
self::userForget($userId);
return;
}
$ttl = self::intConfig('ttl_user', 90);
self::redisSetEx(self::KEY_USER . $userId, $ttl, json_encode($row, JSON_UNESCAPED_UNICODE));
}
/**
* 尝试获取「后台修改该用户」互斥锁Redis SET NX EX
* 与热点缓存开关无关:只要 Redis 可用即加锁连接失败时降级为仅依赖数据库乐观锁WHERE coin=)。
*
* @return array{acquired: bool, token: ?string, redis_lock: bool}
*/
public static function userAdminMutationLockTry(int $userId): array
{
if ($userId <= 0) {
return ['acquired' => false, 'token' => null, 'redis_lock' => false];
}
return GameHotDataLock::tryAcquire(GameHotDataLock::TYPE_USER, (string) $userId);
}
/**
* 释放 userAdminMutationLockTry 取得的锁(仅当 redis_lock 且 token 非空)
*/
public static function userAdminMutationLockRelease(int $userId, ?string $token, bool $redisLock): void
{
if ($userId <= 0) {
return;
}
GameHotDataLock::release(GameHotDataLock::TYPE_USER, (string) $userId, $token, $redisLock);
}
/**
* 用缓存行构造已存在库的 User供 Auth 等高频读)
*/

View File

@@ -0,0 +1,66 @@
<?php
declare(strict_types=1);
namespace app\common\service;
use support\Redis;
use Throwable;
/**
* 热点缓存回源刷新队列LPUSH / RPOP由独立进程消费削峰 Redis 与 DB 回读。
* 业务线程在落库后仍会同步刷新缓存;队列任务为幂等补偿,避免瞬时高峰打满连接。
*/
final class GameHotDataWriteQueue
{
public const OP_USER_REFRESH = 'user.refresh';
public const OP_GC_REFRESH = 'gc.refresh';
public const OP_GR_SYNC = 'gr.sync';
public static function queueListKey(): string
{
$k = config('game_hot_cache.queue_list_key', 'dfw:q:hot_data_write');
return is_string($k) && $k !== '' ? $k : 'dfw:q:hot_data_write';
}
public static function enabled(): bool
{
return filter_var(config('game_hot_cache.enable_cache_write_queue', true), FILTER_VALIDATE_BOOLEAN);
}
/**
* @param array<string, mixed> $job 须含 op 字段
*/
public static function enqueue(array $job): void
{
if (!self::enabled()) {
return;
}
if (!isset($job['op']) || !is_string($job['op']) || $job['op'] === '') {
return;
}
$job['v'] = 1;
$job['ts'] = time();
$max = self::maxQueueLength();
try {
if ($max > 0) {
$len = Redis::lLen(self::queueListKey());
if (is_int($len) && $len >= $max) {
Redis::rPop(self::queueListKey());
}
}
Redis::lPush(self::queueListKey(), json_encode($job, JSON_UNESCAPED_UNICODE));
} catch (Throwable) {
}
}
public static function maxQueueLength(): int
{
$v = config('game_hot_cache.queue_max_length', 50000);
$n = filter_var($v, FILTER_VALIDATE_INT);
return ($n === false || $n < 0) ? 50000 : $n;
}
}

View File

@@ -5,6 +5,8 @@ declare(strict_types=1);
namespace app\common\service;
use app\common\library\game\StreakWinReward;
use app\common\service\GameHotDataCoordinator;
use app\common\service\GameHotDataLock;
use support\think\Db;
use Throwable;
use Webman\Push\Api;
@@ -253,13 +255,22 @@ final class GameLiveService
return ['ok' => false, 'msg' => __('This period has ended; please refresh the page')];
}
self::ensureAiLocked((int) $record['id']);
Db::name('game_record')->where('id', (int) $record['id'])->update([
'pending_draw_number' => $manualNumber,
'update_time' => time(),
]);
GameHotDataRedis::gameRecordForget((int) $record['id']);
self::publishSnapshot(null);
$rid = (int) $record['id'];
$lock = GameHotDataLock::tryAcquireWithWait(GameHotDataLock::TYPE_GAME_RECORD, (string) $rid, 1200);
if (!$lock['acquired']) {
return ['ok' => false, 'msg' => __('Another operation is in progress for this period; please try again later')];
}
try {
self::ensureAiLocked($rid);
Db::name('game_record')->where('id', $rid)->update([
'pending_draw_number' => $manualNumber,
'update_time' => time(),
]);
GameHotDataCoordinator::afterGameRecordCommitted($rid);
self::publishSnapshot(null);
} finally {
GameHotDataLock::release(GameHotDataLock::TYPE_GAME_RECORD, (string) $rid, $lock['token'], $lock['redis_lock']);
}
return [
'ok' => true,
@@ -289,8 +300,14 @@ final class GameLiveService
return ['ok' => false, 'msg' => __('Period countdown has not ended; cannot draw yet')];
}
self::ensureAiLocked((int) $record['id']);
$record = self::reloadRecord((int) $record['id']);
$rid = (int) $record['id'];
$lock = GameHotDataLock::tryAcquireWithWait(GameHotDataLock::TYPE_GAME_RECORD, (string) $rid, 2000);
if (!$lock['acquired']) {
return ['ok' => false, 'msg' => __('Another operation is in progress for this period; please try again later')];
}
try {
self::ensureAiLocked($rid);
$record = self::reloadRecord($rid);
if (!$record) {
return ['ok' => false, 'msg' => __('No active game in progress')];
}
@@ -346,10 +363,10 @@ final class GameLiveService
return ['ok' => false, 'msg' => __('Game live: settlement error') . ': ' . $e->getMessage()];
}
GameHotDataRedis::gameRecordForget((int) $record['id']);
GameHotDataCoordinator::afterGameRecordCommitted($rid);
try {
GameRecordStatService::refreshForRecordId((int) $record['id']);
GameRecordStatService::refreshForRecordId($rid);
} catch (Throwable) {
}
JackpotPushService::publishHits($settleOut['jackpot_hits'] ?? []);
@@ -365,6 +382,9 @@ final class GameLiveService
'estimated_loss' => $finalLoss,
'payout_until' => $payoutUntil,
];
} finally {
GameHotDataLock::release(GameHotDataLock::TYPE_GAME_RECORD, (string) $rid, $lock['token'], $lock['redis_lock']);
}
}
/**
@@ -382,22 +402,30 @@ final class GameLiveService
return;
}
$id = (int) $row['id'];
Db::startTrans();
try {
Db::name('game_record')->where('id', $id)->update([
'status' => 4,
'payout_until' => null,
'update_time' => time(),
]);
GameRecordService::createNextRecordAfterDraw();
Db::commit();
} catch (Throwable) {
Db::rollback();
$lock = GameHotDataLock::tryAcquireWithWait(GameHotDataLock::TYPE_GAME_RECORD, (string) $id, 2000);
if (!$lock['acquired']) {
return;
}
GameHotDataRedis::gameRecordForget($id);
GameRecordStatService::refreshForRecordId($id);
self::publishSnapshot(null);
try {
Db::startTrans();
try {
Db::name('game_record')->where('id', $id)->update([
'status' => 4,
'payout_until' => null,
'update_time' => time(),
]);
GameRecordService::createNextRecordAfterDraw();
Db::commit();
} catch (Throwable) {
Db::rollback();
return;
}
GameHotDataCoordinator::afterGameRecordCommitted($id);
GameRecordStatService::refreshForRecordId($id);
self::publishSnapshot(null);
} finally {
GameHotDataLock::release(GameHotDataLock::TYPE_GAME_RECORD, (string) $id, $lock['token'], $lock['redis_lock']);
}
}
public static function tickAutoDraw(): void
@@ -609,7 +637,7 @@ final class GameLiveService
'status' => 1,
'update_time' => time(),
]);
GameHotDataRedis::gameRecordForget($recordId);
GameHotDataCoordinator::afterGameRecordCommitted($recordId);
$record['status'] = 1;
self::publishPublicPeriodLocked($record);
}
@@ -629,7 +657,7 @@ final class GameLiveService
$update['status'] = 1;
}
Db::name('game_record')->where('id', $recordId)->update($update);
GameHotDataRedis::gameRecordForget($recordId);
GameHotDataCoordinator::afterGameRecordCommitted($recordId);
$record = array_merge($record, $update);
if ($st === 0) {
self::publishPublicPeriodLocked($record);

View File

@@ -99,7 +99,7 @@ final class GameRecordService
'create_time' => $now,
'update_time' => $now,
]);
GameHotDataRedis::gameRecordForget();
GameHotDataCoordinator::afterGameRecordCommitted(null);
return $periodNo;
}
@@ -123,7 +123,7 @@ final class GameRecordService
'remark' => $remark,
'update_time' => $now,
]);
GameHotDataRedis::gameConfigForget($key);
GameHotDataCoordinator::afterGameConfigKeyCommitted($key);
return;
}
Db::name('game_config')->insert([
@@ -134,6 +134,6 @@ final class GameRecordService
'create_time' => $now,
'update_time' => $now,
]);
GameHotDataRedis::gameConfigForget($key);
GameHotDataCoordinator::afterGameConfigKeyCommitted($key);
}
}

View File

@@ -33,7 +33,7 @@ final class GameRecordStatService
'winner_user_count' => 0,
'update_time' => $now,
]);
GameHotDataRedis::gameRecordForget($recordId);
GameHotDataCoordinator::afterGameRecordCommitted($recordId);
return;
}
@@ -79,7 +79,7 @@ final class GameRecordStatService
'winner_user_count' => count($winnerUserIds),
'update_time' => $now,
]);
GameHotDataRedis::gameRecordForget($recordId);
GameHotDataCoordinator::afterGameRecordCommitted($recordId);
}
/**