aa8eb305创建于 2025年3月23日历史提交
<?php

namespace Module\AigcBase\Job;

use ModStart\Core\Dao\ModelUtil;
use ModStart\Core\Exception\BizException;
use ModStart\Core\Input\Response;
use ModStart\Core\Job\BaseJob;
use ModStart\Core\Util\LogUtil;
use ModStart\Core\Util\SerializeUtil;
use ModStart\Core\Util\StrUtil;
use Module\AigcBase\Biz\AigcTaskBiz;
use Module\AigcBase\Model\AigcTask;
use Module\AigcBase\Util\AigcTaskUtil;
use Module\Member\Util\MemberCreditUtil;
use Module\Vendor\Type\JobStatus;

class AigcTaskProcessJob extends BaseJob
{
    public $taskId;

    public static function create($taskId, $delay = 0)
    {
        $job = new static();
        $job->taskId = $taskId;
        if ($delay) {
            $job->delay($delay);
        }
        app('Illuminate\Contracts\Bus\Dispatcher')->dispatch($job);
    }

    private function markFail($msg, $updateResultMsg = false)
    {
        LogUtil::info('AigcTaskProcessJob.fail - ' . $this->taskId, $msg);
        ModelUtil::update(AigcTask::class, $this->taskId, [
            'status' => JobStatus::FAIL,
            'statusRemark' => StrUtil::mbLimit($msg, 100),
        ]);
        if ($updateResultMsg) {
            AigcTaskUtil::updateResultMsg($this->taskId, $msg);
        }
    }

    public function handle()
    {
        LogUtil::info('AigcTaskProcessJob.start - ' . $this->taskId);
        $task = ModelUtil::get(AigcTask::class, $this->taskId);
        if (empty($task)) {
            LogUtil::info('AigcTaskProcessJob.empty - ' . $this->taskId);
            return;
        }
        if ($task['status'] == JobStatus::SUCCESS || $task['status'] == JobStatus::FAIL) {
            LogUtil::info('AigcTaskProcessJob.done - ' . $this->taskId);
            return;
        }
        ModelUtil::transactionCommit();
        ModelUtil::decodeRecordJson($task, ['modelConfig', 'result']);
        $bizer = AigcTaskBiz::getByName($task['biz']);
        if (empty($bizer)) {
            $this->markFail('bizer.empty');
            return;
        }
        $modelConfig = $task['modelConfig'];
        if ($task['status'] == JobStatus::QUEUE) {
            LogUtil::info('AigcTaskProcessJob.queue - ' . $this->taskId);
            ModelUtil::update(AigcTask::class, $this->taskId, [
                'status' => JobStatus::PROCESS,
                'startTime' => date('Y-m-d H:i:s'),
            ]);
            if (MemberCreditUtil::getTotal($task['memberUserId']) <= 0) {
                $this->markFail(modstart_module_config('Member', 'creditName', '积分') . '不足', true);
                return;
            }
            try {
                $ret = $bizer->callQueue($modelConfig, [
                    'task' => $task,
                ]);
                LogUtil::info('AigcTaskProcessJob.queueRet - ' . $this->taskId, $ret);
            } catch (BizException $e) {
                $ret = Response::generateError($e->getMessage());
            } catch (\Exception $e) {
                throw $e;
            }
            if (Response::isError($ret)) {
                $this->markFail('callQueue.error - ' . $ret['msg']);
                return;
            }
            if (empty($ret['data']['result'])) {
                $this->markFail('callQueue.result.empty');
                return;
            }
            AigcTaskUtil::updateResult($this->taskId, $ret['data']['result']);
            self::create($this->taskId, 20);
        } else {
            if (empty($task['result'])) {
                $this->markFail('result.empty');
                return;
            }
            try {
                $ret = $bizer->callQuery($task['result'], [
                    'task' => $task,
                ]);
                LogUtil::info('AigcTaskProcessJob.queryRet - ' . $this->taskId, $ret);
            } catch (BizException $e) {
                $ret = Response::generateError($e->getMessage());
            } catch (\Exception $e) {
                throw $e;
            }
            if (Response::isError($ret)) {
                $this->markFail('callQuery.error - ' . $ret['msg']);
                return;
            }
            if (empty($ret['data']['status'])) {
                $this->markFail('callQuery.status.empty');
                return;
            }
            if ($ret['data']['status'] == 'SUCCESS') {
                LogUtil::info('AigcTaskProcessJob.success - ' . $this->taskId, $ret);
                if (empty($ret['data']['result'])) {
                    $this->markFail('callQuery.result.empty');
                    return;
                }
                $result = array_merge($task['result'], $ret['data']['result']);
                $update = [];
                $update['status'] = JobStatus::SUCCESS;
                $update['cost'] = time() - strtotime($task['startTime']);
                $update['result'] = SerializeUtil::jsonEncode($result);
                ModelUtil::update(AigcTask::class, $this->taskId, $update);
                foreach ($update as $k => $v) {
                    $task[$k] = $v;
                }
                $creditCost = $bizer->creditCost($result, [
                    'task' => $task,
                ]);
                LogUtil::info('AigcTaskProcessJob.creditCost - ' . $this->taskId, $creditCost);
                if ($creditCost > 0 && $task['creditCost'] != $creditCost) {
                    ModelUtil::transactionBegin();
                    $task = ModelUtil::getWithLock(AigcTask::class, [
                        'id' => $task['id'],
                        'creditCost' => $task['creditCost'],
                    ]);
                    if (!empty($task)) {
                        ModelUtil::update(
                            AigcTask::class,
                            $task['id'],
                            [
                                'creditCost' => $creditCost,
                            ]
                        );
                        MemberCreditUtil::change(
                            $task['memberUserId'],
                            -$creditCost,
                            '运行任务消耗(ID=' . $task['id'] . ')',
                            null,
                            [
                                'checkNegative' => false,
                            ]
                        );
                    }
                    ModelUtil::transactionCommit();
                }
            } elseif ($ret['data']['status'] == 'FAIL') {
                $errorMsg = 'callQuery.fail';
                if (!empty($ret['data']['result']['msg'])) {
                    $errorMsg = $ret['data']['result']['msg'];
                }
                $this->markFail($errorMsg, true);
            } else {
                LogUtil::info('AigcTaskProcessJob.queryLater - ' . $this->taskId, $ret);
                self::create($this->taskId, 5);
            }
        }
    }
}