Files
webman-buildadmin/app/process/GameHotDataQueueConsumer.php
zhenhui 1eed3cf0f7 1.增加互斥锁:保证缓存和数据库数据一致性
2.增加消费队列,保证mysql数据的正常保存
2026-04-20 14:13:48 +08:00

84 lines
2.8 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace app\process;
use app\common\service\GameHotDataRedis;
use app\common\service\GameHotDataWriteQueue;
use support\Redis;
use Throwable;
use Workerman\Timer;
/**
* 消费热点缓存回源队列RPOP 批量),与 HTTP 进程分离以削峰。
*/
class GameHotDataQueueConsumer
{
public function onWorkerStart(): void
{
$interval = config('game_hot_cache.queue_consumer_tick_seconds', 0.1);
if (!is_float($interval) && !is_int($interval)) {
$interval = 0.1;
}
$interval = (float) $interval;
if ($interval < 0.02) {
$interval = 0.02;
}
Timer::add($interval, static function (): void {
if (!GameHotDataWriteQueue::enabled()) {
return;
}
$key = GameHotDataWriteQueue::queueListKey();
$batch = config('game_hot_cache.queue_consumer_batch', 80);
$n = filter_var($batch, FILTER_VALIDATE_INT);
if ($n === false || $n < 1) {
$n = 80;
}
for ($i = 0; $i < $n; $i++) {
try {
$raw = Redis::rPop($key);
} catch (Throwable) {
break;
}
if ($raw === false || $raw === null || $raw === '') {
break;
}
if (!is_string($raw)) {
continue;
}
$job = json_decode($raw, true);
if (!is_array($job)) {
continue;
}
$op = $job['op'] ?? '';
if (!is_string($op) || $op === '') {
continue;
}
try {
if ($op === GameHotDataWriteQueue::OP_USER_REFRESH) {
$uid = filter_var($job['id'] ?? null, FILTER_VALIDATE_INT);
if ($uid !== false && $uid > 0) {
GameHotDataRedis::userReplaceCacheFromDb($uid);
}
continue;
}
if ($op === GameHotDataWriteQueue::OP_GC_REFRESH) {
$k = $job['key'] ?? '';
if (is_string($k) && $k !== '') {
GameHotDataRedis::gameConfigReplaceFromDb($k);
}
continue;
}
if ($op === GameHotDataWriteQueue::OP_GR_SYNC) {
$rid = filter_var($job['id'] ?? null, FILTER_VALIDATE_INT);
GameHotDataRedis::gameRecordSyncCachesAfterDbWrite($rid === false ? null : $rid);
}
} catch (Throwable) {
}
}
});
}
}