Files
lotteryLaravel/app/Services/Wallet/WalletTransferReconcileDetector.php
kang 5398af0a55 feat: add wallet transfer reconcile command and schedule task
新增钱包划转对账命令行工具与定时任务,用于扫描主站与彩票侧的转账流水差异,记录掉单和异常划转单,并生成对账报告。
2026-05-15 10:41:39 +08:00

255 lines
8.7 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace App\Services\Wallet;
use Carbon\Carbon;
use App\Models\WalletTxn;
use App\Models\TransferOrder;
use App\Models\ReconcileJob;
use App\Models\ReconcileItem;
use Illuminate\Support\Str;
use Illuminate\Support\Collection;
use Illuminate\Support\Facades\DB;
/**
* 钱包划转自动对账扫描器。
*
* 作用:
* - 发现主站与彩票侧的流水不一致
* - 发现长时间停留在 processing / pending_reconcile 的划转单
* - 将结果落到 reconcile_jobs / reconcile_items供后台查看与后续处理
*/
final class WalletTransferReconcileDetector
{
private const RECONCILE_TYPE = 'wallet_transfer';
public function scanAndRecord(
?Carbon $periodStart = null,
?Carbon $periodEnd = null,
int $limit = 500,
int $staleMinutes = 15,
): ?ReconcileJob {
$periodEnd ??= now();
$periodStart ??= $periodEnd->copy()->subDay();
$limit = max(1, $limit);
$staleMinutes = max(1, $staleMinutes);
$staleCutoff = $periodEnd->copy()->subMinutes($staleMinutes);
$orders = TransferOrder::query()
->where(function ($q) use ($periodStart, $periodEnd, $staleCutoff): void {
$q->whereBetween('created_at', [$periodStart, $periodEnd])
->orWhere('status', 'pending_reconcile')
->orWhere(function ($inner) use ($staleCutoff): void {
$inner->where('status', 'processing')
->where('created_at', '<=', $staleCutoff);
});
})
->orderBy('id')
->limit($limit)
->get();
if ($orders->isEmpty()) {
return null;
}
$txnsByTransferNo = WalletTxn::query()
->whereIn('biz_no', $orders->pluck('transfer_no')->all())
->orderBy('id')
->get()
->groupBy(static fn (WalletTxn $txn): string => (string) $txn->biz_no);
$items = [];
foreach ($orders as $order) {
$issue = $this->issueForOrder(
$order,
$txnsByTransferNo->get((string) $order->transfer_no, collect()),
$periodEnd,
$staleMinutes,
);
if ($issue !== null) {
$items[] = $issue;
}
}
if ($items === []) {
return null;
}
return DB::transaction(function () use ($items, $periodStart, $periodEnd): ReconcileJob {
$jobNo = 'REC'.now()->format('YmdHis').strtoupper(str_replace('-', '', Str::uuid()->toString()));
$job = ReconcileJob::query()->create([
'job_no' => $jobNo,
'admin_user_id' => null,
'reconcile_type' => self::RECONCILE_TYPE,
'status' => 'completed',
'period_start' => $periodStart,
'period_end' => $periodEnd,
'summary_json' => [
'item_count' => count($items),
'mismatch_count' => count($items),
],
'finished_at' => now(),
]);
foreach ($items as $row) {
ReconcileItem::query()->create([
'reconcile_job_id' => (int) $job->getKey(),
'side_a_ref' => $row['side_a_ref'],
'side_b_ref' => $row['side_b_ref'],
'difference_amount' => (int) $row['difference_amount'],
'status' => $row['status'],
'resolved_at' => null,
]);
}
return $job->fresh(['items']);
});
}
/**
* @return array{side_a_ref: string, side_b_ref: ?string, difference_amount: int, status: string}|null
*/
private function issueForOrder(TransferOrder $order, Collection $txns, Carbon $periodEnd, int $staleMinutes): ?array
{
$amount = (int) $order->amount;
$latestTxnNo = $this->latestTxnNo($txns);
$createdAt = $order->created_at;
if ($order->status === 'processing') {
if ($createdAt !== null && $createdAt->greaterThan($periodEnd->copy()->subMinutes($staleMinutes))) {
return null;
}
return [
'side_a_ref' => (string) $order->transfer_no,
'side_b_ref' => $latestTxnNo,
'difference_amount' => $amount,
'status' => 'stale_processing',
];
}
if ($order->status === 'pending_reconcile') {
return [
'side_a_ref' => (string) $order->transfer_no,
'side_b_ref' => $latestTxnNo,
'difference_amount' => $amount,
'status' => 'pending_reconcile',
];
}
if ($order->status === 'success') {
$primaryBizType = $order->direction === 'out' ? 'transfer_out' : 'transfer_in';
if (! $this->hasTxn($txns, $primaryBizType, 'posted')) {
return [
'side_a_ref' => (string) $order->transfer_no,
'side_b_ref' => $latestTxnNo,
'difference_amount' => $amount,
'status' => 'missing_wallet_txn',
];
}
if ($order->direction === 'out' && $this->hasTxn($txns, 'transfer_out_refund')) {
return [
'side_a_ref' => (string) $order->transfer_no,
'side_b_ref' => $latestTxnNo,
'difference_amount' => $amount,
'status' => 'unexpected_wallet_txn',
];
}
if ($order->direction === 'in' && $txns->count() !== 1) {
return [
'side_a_ref' => (string) $order->transfer_no,
'side_b_ref' => $latestTxnNo,
'difference_amount' => $amount,
'status' => 'unexpected_wallet_txn',
];
}
return null;
}
if ($order->status === 'failed') {
if ($order->direction === 'in') {
if ($txns->isNotEmpty()) {
return [
'side_a_ref' => (string) $order->transfer_no,
'side_b_ref' => $latestTxnNo,
'difference_amount' => $amount,
'status' => 'unexpected_wallet_txn',
];
}
return null;
}
if (($order->fail_reason ?? '') === 'insufficient_balance') {
if ($txns->isNotEmpty()) {
return [
'side_a_ref' => (string) $order->transfer_no,
'side_b_ref' => $latestTxnNo,
'difference_amount' => $amount,
'status' => 'unexpected_wallet_txn',
];
}
return null;
}
$hasTransferOut = $this->hasTxn($txns, 'transfer_out', 'posted');
$hasRefund = $this->hasTxn($txns, 'transfer_out_refund', 'posted');
if (! $hasTransferOut || ! $hasRefund) {
return [
'side_a_ref' => (string) $order->transfer_no,
'side_b_ref' => $latestTxnNo,
'difference_amount' => $amount,
'status' => ! $hasTransferOut ? 'missing_wallet_txn' : 'missing_refund',
];
}
return null;
}
if ($order->status === 'reversed') {
if ($order->direction === 'out') {
$hasTransferOut = $this->hasTxn($txns, 'transfer_out', 'posted');
$hasReversal = $this->hasTxn($txns, 'reversal', 'posted');
if (! $hasTransferOut || ! $hasReversal) {
return [
'side_a_ref' => (string) $order->transfer_no,
'side_b_ref' => $latestTxnNo,
'difference_amount' => $amount,
'status' => ! $hasReversal ? 'missing_reversal' : 'missing_wallet_txn',
];
}
}
return null;
}
return null;
}
private function hasTxn(Collection $txns, string $bizType, ?string $status = null): bool
{
return $txns->contains(static function (WalletTxn $txn) use ($bizType, $status): bool {
if ($txn->biz_type !== $bizType) {
return false;
}
return $status === null || $txn->status === $status;
});
}
private function latestTxnNo(Collection $txns): ?string
{
/** @var WalletTxn|null $txn */
$txn = $txns->sortByDesc('id')->first();
return $txn !== null ? (string) $txn->txn_no : null;
}
}