Files
webman-buildadmin-mall/app/process/AngpowImportJobs.php

439 lines
14 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace app\process;
use app\common\model\MallItem;
use app\common\model\MallOrder;
use app\common\model\MallUserAsset;
use GuzzleHttp\Client;
use support\Log;
use Workerman\Timer;
use Workerman\Worker;
/**
* Angpow 导入推送任务
* - 数据源mall_ordertype=BONUS
* - 推送频率:每 30 秒
* - 批量:每次最多 100 条(对方文档限制)
* - 幂等merchant_code + report_date 级别签名;每条订单通过 external_transaction_id 在本地控制只推送一次
*/
class AngpowImportJobs
{
private const TIMER_SECONDS = 30;
private const BATCH_LIMIT = 100;
private const MAX_RETRY = 3;
protected Client $http;
public function __construct()
{
// 确保定时任务只在一个 worker 上运行
if (!Worker::getAllWorkers()) {
return;
}
$this->http = new Client($this->buildGuzzleOptions());
Timer::add(self::TIMER_SECONDS, [$this, 'pushPendingOrders']);
}
/**
* Guzzle 默认校验 HTTPSWindows 未配置 CA 时会出现 cURL error 60。
* 优先使用 PLAYX_ANGPOW_IMPORT_CACERT 指向 cacert.pem否则可按环境关闭校验仅开发
*/
private function buildGuzzleOptions(): array
{
$options = [
'timeout' => 20,
'http_errors' => false,
];
$conf = config('playx.angpow_import');
if (!is_array($conf)) {
return $options;
}
$caFile = $conf['ca_file'] ?? '';
if (is_string($caFile) && $caFile !== '' && is_file($caFile)) {
$options['verify'] = $caFile;
return $options;
}
$verifySsl = $conf['verify_ssl'] ?? true;
if ($verifySsl === false) {
$options['verify'] = false;
}
return $options;
}
public function pushPendingOrders(): void
{
$conf = config('playx.angpow_import');
if (!is_array($conf)) {
return;
}
$baseUrl = $conf['base_url'] ?? '';
$path = $conf['path'] ?? '';
$merchantCode = $conf['merchant_code'] ?? '';
$authKey = $conf['auth_key'] ?? '';
if (!is_string($baseUrl) || $baseUrl === '' || !is_string($path) || $path === '' || !is_string($merchantCode) || $merchantCode === '') {
return;
}
if (!is_string($authKey) || $authKey === '') {
return;
}
$url = rtrim($baseUrl, '/') . $path;
$orders = MallOrder::where('type', MallOrder::TYPE_BONUS)
->whereIn('grant_status', [MallOrder::GRANT_NOT_SENT, MallOrder::GRANT_FAILED_RETRYABLE])
->where('status', MallOrder::STATUS_PENDING)
->where('retry_count', '<', self::MAX_RETRY)
->order('id', 'asc')
->limit(self::BATCH_LIMIT)
->select();
if ($orders->isEmpty()) {
return;
}
$reportDate = strval(time());
$signatureInput = 'merchant_code=' . $merchantCode . '&report_date=' . $reportDate;
$signature = $this->buildSignature($signatureInput, $authKey);
if ($signature === null) {
return;
}
$payload = [
'merchant_code' => $merchantCode,
'report_date' => $reportDate,
'angpow' => [],
'currency_visual' => [
[
'currency' => strval($conf['currency'] ?? 'MYR'),
'visual_name' => strval($conf['visual_name'] ?? 'Angpow'),
],
],
];
$orderIds = [];
foreach ($orders as $order) {
if (!$order instanceof MallOrder) {
continue;
}
$rowResult = $this->buildAngpowRow($order);
if ($rowResult === null) {
// 构造失败:直接标为可重试失败
$this->markFailedAttempt($order, 'Build payload failed');
continue;
}
if (is_string($rowResult)) {
$this->markFailedAttempt($order, $rowResult);
continue;
}
$payload['angpow'][] = $rowResult;
$orderIds[] = $order->id;
}
if (empty($payload['angpow'])) {
return;
}
// 先标记“已尝试发送”,避免并发重复推送;同时只在这里累加 retry_count一次推送=一次尝试)
$now = time();
foreach ($orders as $order) {
if (!$order instanceof MallOrder) {
continue;
}
if (!in_array($order->id, $orderIds, true)) {
continue;
}
$retry = $order->retry_count ?? 0;
if (!is_int($retry)) {
$retry = is_numeric($retry) ? intval($retry) : 0;
}
$order->retry_count = $retry + 1;
$order->grant_status = MallOrder::GRANT_SENT_PENDING;
$order->update_time = $now;
$order->save();
}
$res = null;
$body = '';
try {
$res = $this->http->post($url, [
'headers' => [
'Content-Type' => 'application/json',
'X-Request-Signature' => $signature,
],
'json' => $payload,
]);
$body = strval($res->getBody());
} catch (\Throwable $e) {
// 网络/异常:对这一批订单记一次失败尝试
foreach ($orders as $order) {
if (!$order instanceof MallOrder) {
continue;
}
$this->markFailedAttempt($order, $e->getMessage());
}
return;
}
$data = json_decode($body, true);
if (!is_array($data)) {
Log::error($this->formatAngpowInvalidJsonBodyForLog($res, $body, $orderIds));
foreach ($orders as $order) {
if (!$order instanceof MallOrder) {
continue;
}
$this->markFailedAttempt($order, 'Invalid response');
}
return;
}
$code = $data['code'] ?? null;
$message = $data['message'] ?? '';
$msg = is_string($message) ? $message : 'Request failed';
// 成功code=0
if ($code === '0' || $code === 0) {
MallOrder::whereIn('id', $orderIds)->update([
'grant_status' => MallOrder::GRANT_ACCEPTED,
'status' => MallOrder::STATUS_COMPLETED,
'fail_reason' => null,
'update_time' => time(),
]);
return;
}
// 失败:整批视为失败(对方未提供逐条返回);解析后的 JSON 仅写入项目日志
$this->logAngpowPushRejectedParsedBody($orderIds, $data);
foreach ($orders as $order) {
if (!$order instanceof MallOrder) {
continue;
}
$this->markFailedAttempt($order, $msg);
}
}
/**
* @return array<string, mixed>|string|null 成功返回行数组;用户名缺失返回错误文案字符串;其它构造失败返回 null
*/
private function buildAngpowRow(MallOrder $order): array|string|null
{
$asset = MallUserAsset::where('playx_user_id', $order->user_id)->find();
if (!$asset) {
if (is_string($order->user_id) && ctype_digit($order->user_id)) {
$byId = MallUserAsset::where('id', $order->user_id)->find();
if ($byId) {
$asset = $byId;
}
}
}
if (!$asset || !is_string($asset->playx_user_id ?? null) || strval($asset->playx_user_id) === '') {
return null;
}
$memberLogin = trim(strval($asset->username ?? ''));
if ($memberLogin === '') {
return 'User username empty';
}
$item = null;
if ($order->mallItem) {
$item = $order->mallItem;
} else {
$item = MallItem::where('id', $order->mall_item_id)->find();
}
if (!$item) {
return null;
}
// $createTime = $order->create_time ?? null;
// if (!is_int($createTime)) {
// if (is_numeric($createTime)) {
// $createTime = intval($createTime);
// } else {
// $createTime = time();
// }
// }
$start = gmdate('Y-m-d\TH:i:s\Z', strtotime($order->start_time));
$end = gmdate('Y-m-d\TH:i:s\Z', strtotime($order->end_time));
return [
'member_login' => $memberLogin,
'start_time' => $start,
'end_time' => $end,
'amount' => $order->amount,
'reward_name' => strval($item->title ?? ''),
'description' => strval($item->description ?? ''),
'member_inbox_message' => 'Congratulations! You received an angpow.',
'category' => strval($item->category ?? ''),
'category_title' => strval($item->category_title ?? ''),
'one_time_turnover' => 'yes',
'multiplier' => $order->multiplier,
];
}
/**
* 对方返回体无法解析为「根级 JSON 对象」时,生成写入项目日志的完整说明(不落库订单 fail_reason
*
* @param list<int|string> $orderIds
* @param mixed $response Guzzle 响应或其它
*/
private function formatAngpowInvalidJsonBodyForLog(mixed $response, string $rawBody, array $orderIds): string
{
$httpPart = 'HTTP unknown';
if (is_object($response) && method_exists($response, 'getStatusCode')) {
$code = $response->getStatusCode();
if (is_int($code)) {
$httpPart = 'HTTP ' . $code;
}
}
$trimmed = trim($rawBody);
if ($trimmed === '') {
$detail = 'empty body';
} elseif (json_last_error() !== JSON_ERROR_NONE) {
$msg = json_last_error_msg();
$detail = is_string($msg) && $msg !== '' ? $msg : 'JSON parse error';
} else {
$detail = 'root is not a JSON object';
}
$idPart = implode(',', $orderIds);
$snippet = '';
if ($trimmed !== '') {
$oneLine = preg_replace('/\s+/', ' ', $trimmed);
$snippet = is_string($oneLine) && $oneLine !== '' ? $oneLine : $trimmed;
$maxLen = 8000;
if (function_exists('mb_strlen') && function_exists('mb_substr')) {
if (mb_strlen($snippet, 'UTF-8') > $maxLen) {
$snippet = mb_substr($snippet, 0, $maxLen, 'UTF-8') . '…';
}
} elseif (strlen($snippet) > $maxLen) {
$snippet = substr($snippet, 0, $maxLen) . '…';
}
}
$head = '[AngpowImport] response not a JSON object | order_ids=' . $idPart . ' | ' . $httpPart . ' | ' . $detail;
if ($snippet === '') {
return $head;
}
return $head . ' | body=' . $snippet;
}
/**
* 可解析 JSON 但业务失败时,将解析结果写入项目日志(订单仍只记 message 等业务文案)。
*
* @param list<int|string> $orderIds
* @param array<mixed> $parsed
*/
private function logAngpowPushRejectedParsedBody(array $orderIds, array $parsed): void
{
$flags = JSON_UNESCAPED_UNICODE;
if (defined('JSON_INVALID_UTF8_SUBSTITUTE')) {
$flags = $flags | JSON_INVALID_UTF8_SUBSTITUTE;
}
$encoded = json_encode($parsed, $flags);
if (!is_string($encoded)) {
$encoded = 'json_encode failed';
}
$maxLen = 8000;
if (function_exists('mb_strlen') && function_exists('mb_substr')) {
if (mb_strlen($encoded, 'UTF-8') > $maxLen) {
$encoded = mb_substr($encoded, 0, $maxLen, 'UTF-8') . '…';
}
} elseif (strlen($encoded) > $maxLen) {
$encoded = substr($encoded, 0, $maxLen) . '…';
}
Log::error('[AngpowImport] push rejected (parsed JSON) | order_ids=' . implode(',', $orderIds) . ' | body=' . $encoded);
}
/**
* 单条失败原因压成一行,避免异常信息自带换行导致与 attempt 分段混在一起。
*/
private function normalizeReasonLine(string $reason): string
{
$s = trim($reason);
if ($s === '') {
return '';
}
$s = str_replace(["\r\n", "\r", "\n"], ' ', $s);
$replaced = preg_replace('/\s+/', ' ', $s);
return is_string($replaced) && $replaced !== '' ? $replaced : $s;
}
private function markFailedAttempt(MallOrder $order, string $reason): void
{
// retry_count 在“准备发送”阶段已 +1此处用当前 retry_count 作为 attempt 编号
$retryCount = $order->retry_count ?? 0;
$attempt = is_int($retryCount) ? $retryCount : (is_numeric($retryCount) ? intval($retryCount) : 0);
if ($attempt <= 0) {
$attempt = 1;
$order->retry_count = 1;
}
$prev = $order->fail_reason;
$prefix = 'attempt ' . $attempt . ': ';
$line = $prefix . $this->normalizeReasonLine($reason);
$newReason = $line;
if (is_string($prev) && $prev !== '') {
$newReason = $prev . "\n" . $line;
}
$final = $attempt >= self::MAX_RETRY;
$order->grant_status = $final ? MallOrder::GRANT_FAILED_FINAL : MallOrder::GRANT_FAILED_RETRYABLE;
$order->fail_reason = $newReason;
$order->save();
}
/**
* 生成对方要求的 Base64(HMAC-SHA1)
* - 文档中示例 python 会 base64_decode(key) 后参与 hmac
* - 生产 key 由 BA 提供,可能是 base64 或 hex这里做兼容处理
*/
private function buildSignature(string $input, string $authKey): ?string
{
$keyBytes = null;
$maybeBase64 = base64_decode($authKey, true);
if ($maybeBase64 !== false && $maybeBase64 !== '') {
$keyBytes = $maybeBase64;
}
if ($keyBytes === null) {
$isHex = ctype_xdigit($authKey) && (strlen($authKey) % 2 === 0);
if ($isHex) {
$hex = hex2bin($authKey);
if ($hex !== false && $hex !== '') {
$keyBytes = $hex;
}
}
}
if ($keyBytes === null) {
$keyBytes = $authKey;
}
$raw = hash_hmac('sha1', $input, $keyBytes, true);
if (!is_string($raw) || $raw === '') {
return null;
}
return base64_encode($raw);
}
}