<?php
declare(strict_types=1);
* +----------------------------------------------------------------------
* | ThinkAdmin Plugin for ThinkAdmin
* +----------------------------------------------------------------------
* | 版权所有 2014~2026 ThinkAdmin [ thinkadmin.top ]
* +----------------------------------------------------------------------
* | 官方网站: https://thinkadmin.top
* +----------------------------------------------------------------------
* | 开源协议 ( https://mit-license.org )
* | 免责声明 ( https://thinkadmin.top/disclaimer )
* | 会员特权 ( https://thinkadmin.top/vip-introduce )
* +----------------------------------------------------------------------
* | gitee 代码仓库:https://gitee.com/zoujingli/ThinkAdmin
* | github 代码仓库:https://github.com/zoujingli/ThinkAdmin
* +----------------------------------------------------------------------
*/
namespace think\admin\support\command;
use Psr\Log\NullLogger;
use think\admin\Command;
use think\admin\model\SystemQueue;
use think\admin\service\QueueService;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use think\db\exception\DataNotFoundException;
use think\db\exception\DbException;
use think\db\exception\ModelNotFoundException;
* 异步任务管理指令.
* @class Queue
*/
class Queue extends Command
{
* 任务等待处理.
* @var int
*/
public const STATE_WAIT = 1;
* 任务正在处理.
* @var int
*/
public const STATE_LOCK = 2;
* 任务处理完成.
* @var int
*/
public const STATE_DONE = 3;
* 任务处理失败.
* @var int
*/
public const STATE_ERROR = 4;
* 监听进程指令.
* @var string
*/
public const QUEUE_LISTEN = 'xadmin:queue listen';
* 当前任务编号.
* @var string
*/
protected $code;
* 指令任务配置.
*/
public function configure()
{
$this->setName('xadmin:queue');
$this->addOption('host', '-H', Option::VALUE_OPTIONAL, 'The host of WebServer.');
$this->addOption('port', '-p', Option::VALUE_OPTIONAL, 'The port of WebServer.');
$this->addOption('daemon', 'd', Option::VALUE_NONE, 'The queue listen in daemon mode');
$this->addArgument('action', Argument::OPTIONAL, 'stop|start|status|query|listen|clean|dorun|webstop|webstart|webstatus', 'listen');
$this->addArgument('code', Argument::OPTIONAL, 'Taskcode');
$this->addArgument('spts', Argument::OPTIONAL, 'Separator');
$this->setDescription('Asynchronous Command Queue Task for ThinkAdmin');
}
* 任务执行入口.
*/
protected function execute(Input $input, Output $output)
{
$action = $input->hasOption('daemon') ? 'start' : $input->getArgument('action');
if (method_exists($this, $method = "{$action}Action")) {
return $this->{$method}();
}
$this->output->error('># Wrong operation, Allow stop|start|status|query|listen|clean|dorun|webstop|webstart|webstatus');
}
* 停止 WebServer 调试进程.
*/
protected function webStopAction()
{
$root = syspath('public/');
if (count($result = $this->process->query("{$root} {$root}router.php")) < 1) {
$this->output->writeln('># There are no WebServer processes to stop');
} else {
foreach ($result as $item) {
$this->process->close(intval($item['pid']));
$this->output->writeln("># Successfully sent end signal to process {$item['pid']}");
}
}
}
* 启动 WebServer 调试进程.
*/
protected function webStartAction()
{
$prot = 'http';
$port = $this->input->getOption('port') ?: '80';
$host = $this->input->getOption('host') ?: '127.0.0.1';
$root = syspath('public' . DIRECTORY_SEPARATOR);
$command = "php -S {$host}:{$port} -t {$root} {$root}router.php";
$this->output->comment(">$ {$command}");
if (count($result = $this->process->query($command)) > 0) {
if ($this->process->isWin()) {
$this->process->exec("start {$prot}://{$host}:{$port}");
}
$this->output->writeln("># WebServer process already exist for pid {$result[0]['pid']}");
} else {
$this->process->create($command, 2000);
if (count($result = $this->process->query($command)) > 0) {
$this->output->writeln("># WebServer process started successfully for pid {$result[0]['pid']}");
if ($this->process->isWin()) {
$this->process->exec("start {$prot}://{$host}:{$port}");
}
} else {
$this->output->writeln('># WebServer process failed to start');
}
}
}
* 查看 WebServer 调试进程.
*/
protected function webStatusAction()
{
$root = syspath('public' . DIRECTORY_SEPARATOR);
if (count($result = $this->process->query("{$root} {$root}router.php")) > 0) {
$this->output->comment(">$ {$result[0]['cmd']}");
$this->output->writeln("># WebServer process {$result[0]['pid']} running");
} else {
$this->output->writeln('># The WebServer process is not running');
}
}
* 停止所有任务
*/
protected function stopAction()
{
if (count($result = $this->process->thinkQuery('xadmin:queue')) < 1) {
$this->output->writeln('># There are no task processes to stop');
} else {
foreach ($result as $item) {
$this->process->close(intval($item['pid']));
$this->output->writeln("># Successfully sent end signal to process {$item['pid']}");
}
}
}
* 启动后台任务
* @throws DataNotFoundException
* @throws DbException
* @throws ModelNotFoundException
*/
protected function startAction()
{
SystemQueue::mk()->count();
$this->output->comment(">$ {$this->process->think(static::QUEUE_LISTEN)}");
if (count($result = $this->process->thinkQuery(static::QUEUE_LISTEN)) > 0) {
if (is_file($lock = syspath('runtime/cache/time.queue')) && intval(file_get_contents($lock)) + 60 < time()) {
$this->output->writeln('># The task monitoring delay has exceeded 60 seconds, and the monitoring will be restarted.');
$this->process->close(intval($result[0]['pid'])) && $this->process->thinkExec(static::QUEUE_LISTEN, 1000);
} else {
$this->output->writeln("># Queue daemons already exist for pid {$result[0]['pid']}");
}
} else {
$this->process->thinkExec(static::QUEUE_LISTEN, 1000);
if (count($result = $this->process->thinkQuery(static::QUEUE_LISTEN)) > 0) {
$this->output->writeln("># Queue daemons started successfully for pid {$result[0]['pid']}");
} else {
$this->output->writeln('># Queue daemons failed to start');
}
}
}
* 查询所有任务
*/
protected function queryAction()
{
$items = $this->process->thinkQuery('xadmin:queue');
if (count($items) > 0) {
foreach ($items as $item) {
$this->output->writeln("># {$item['pid']}\t{$item['cmd']}");
}
} else {
$this->output->writeln('># No related task process found');
}
}
* 清理所有任务
* @throws \think\admin\Exception
* @throws DbException
*/
protected function cleanAction()
{
$days = intval(sysconf('base.queue_clean_days|raw') ?: 7);
$clean = SystemQueue::mk()->where('exec_time', '<', time() - $days * 24 * 3600)->delete();
$map1 = [['loops_time', '>', 0], ['status', '=', static::STATE_ERROR]];
$map2 = [['exec_time', '<', time() - 3600], ['status', '=', static::STATE_LOCK]];
[$timeout, $loops, $total] = [0, 0, SystemQueue::mk()->whereOr([$map1, $map2])->count()];
foreach (SystemQueue::mk()->whereOr([$map1, $map2])->cursor() as $queue) {
$queue['loops_time'] > 0 ? $loops++ : $timeout++;
if ($queue['loops_time'] > 0) {
$this->queue->message($total, $timeout + $loops, "正在重置任务 {$queue['code']} 为运行");
[$status, $message] = [static::STATE_WAIT, $queue['status'] === static::STATE_ERROR ? '任务执行失败,已自动重置任务!' : '任务执行超时,已自动重置任务!'];
} else {
$this->queue->message($total, $timeout + $loops, "正在标记任务 {$queue['code']} 为超时");
[$status, $message] = [static::STATE_ERROR, '任务执行超时,已自动标识为失败!'];
}
$queue->save(['status' => $status, 'exec_desc' => $message]);
}
$this->setQueueSuccess("清理 {$clean} 条历史任务,关闭 {$timeout} 条超时任务,重置 {$loops} 条循环任务");
}
* 查询兼听状态
*/
protected function statusAction()
{
if (count($result = $this->process->thinkQuery(static::QUEUE_LISTEN)) > 0) {
$this->output->writeln("Listening for main process {$result[0]['pid']} running");
} else {
$this->output->writeln('The Listening main process is not running');
}
}
* 启动任务监听.
*/
protected function listenAction()
{
try {
set_time_limit(0) && PHP_SAPI !== 'cli' && ignore_user_abort(true);
$this->app->db->setLog(new NullLogger());
$this->createListenProcess();
} catch (\Exception $exception) {
trace_file($exception) && usleep(3000000);
$this->output->write('=============== EXCEPTION ===============');
$this->output->write($exception->getMessage());
$this->output->writeln('=============== TRY-REBOOT ===============');
$this->createListenProcess();
}
}
* 执行指定任务
* @throws \think\admin\Exception
*/
protected function doRunAction()
{
$this->code = trim($this->input->getArgument('code'));
if (empty($this->code)) {
$this->output->error('Task number needs to be specified for task execution');
} else {
try {
set_time_limit(0) && PHP_SAPI !== 'cli' && ignore_user_abort(true);
$this->queue->initialize($this->code);
if (empty($this->queue->record) || intval($this->queue->record->getAttr('status')) !== static::STATE_WAIT) {
$this->output->warning("The or status of task {$this->code} is abnormal");
} else {
SystemQueue::mk()->strict(false)->where(['code' => $this->code])->inc('attempts')->update([
'enter_time' => microtime(true), 'outer_time' => 0, 'exec_pid' => getmypid(), 'exec_desc' => '', 'status' => static::STATE_LOCK,
]);
$this->queue->progress(2, '>>> 任务处理开始 <<<', '0');
defined('WorkQueueCall') or define('WorkQueueCall', true);
defined('WorkQueueCode') or define('WorkQueueCode', $this->code);
if (class_exists($command = $this->queue->record->getAttr('command'))) {
$class = $this->app->make($command, [], true);
if ($class instanceof \think\admin\Queue) {
$this->updateQueue(static::STATE_DONE, $class->initialize($this->queue)->execute($this->queue->data) ?: '');
} elseif ($class instanceof QueueService) {
$this->updateQueue(static::STATE_DONE, $class->initialize($this->queue->code)->execute($this->queue->data) ?: '');
} else {
throw new \think\admin\Exception("自定义 {$command} 未继承 think\\admin\\Queue 或 think\\admin\\service\\QueueService");
}
} else {
$attr = explode(' ', trim(preg_replace('|\s+|', ' ', $command)));
$this->updateQueue(static::STATE_DONE, $this->app->console->call(array_shift($attr), $attr)->fetch(), false);
}
}
} catch (\Error|\Exception|\Throwable $exception) {
$isDone = intval($exception->getCode()) === static::STATE_DONE;
$this->updateQueue($isDone ? static::STATE_DONE : static::STATE_ERROR, $exception->getMessage());
}
}
}
* 执行任务监听.
*/
private function createListenProcess()
{
$this->output->writeln("\n\tYou can exit with <info>`CTRL-C`</info>");
$this->output->writeln('=============== LISTENING ===============');
while (true) {
@file_put_contents(syspath('runtime/cache/time.queue'), strval(time()));
[$map, $start] = [[['status', '=', static::STATE_WAIT], ['exec_time', '<=', time()]], microtime(true)];
foreach (SystemQueue::mk()->where($map)->order('exec_time asc')->cursor() as $queue) {
try {
$args = "xadmin:queue dorun {$queue['code']} -";
$this->output->comment(">$ {$this->process->think($args)}");
if (count($this->process->thinkQuery($args)) > 0) {
$this->output->writeln("># Already in progress -> [{$queue['code']}] {$queue['title']}");
} else {
$this->process->thinkExec($args);
$this->output->writeln("># Created new process -> [{$queue['code']}] {$queue['title']}");
}
} catch (\Exception $exception) {
$queue->save(['status' => static::STATE_ERROR, 'outer_time' => time(), 'exec_desc' => $exception->getMessage()]);
$this->output->error("># Execution failed -> [{$queue['code']}] {$queue['title']},{$exception->getMessage()}");
}
}
if (microtime(true) < $start + 1) {
usleep(1000000);
}
}
}
* 修改当前任务状态
* @param int $status 任务状态
* @param string $message 消息内容
* @param bool $isSplit 是否分隔
* @throws \think\admin\Exception
*/
private function updateQueue(int $status, string $message, bool $isSplit = true)
{
$desc = $isSplit ? explode("\n", trim($message)) : [$message];
SystemQueue::mk()->strict(false)->where(['code' => $this->code])->update([
'status' => $status, 'outer_time' => microtime(true), 'exec_pid' => getmypid(), 'exec_desc' => $desc[0],
]);
$this->process->message($message);
if (!empty($desc[0])) {
$this->queue->progress($status, ">>> {$desc[0]} <<<");
}
if ($status === static::STATE_DONE) {
$this->queue->progress($status, '>>> 任务处理完成 <<<', '100.00');
} elseif ($status === static::STATE_ERROR) {
$this->queue->progress($status, '>>> 任务处理失败 <<<');
}
if (($time = intval($this->queue->record->getAttr('loops_time'))) > 0) {
try {
$this->queue->initialize($this->code)->reset($time);
} catch (\Error|\Exception|\Throwable $exception) {
$this->app->log->error("Queue {$this->queue->record->getAttr('code')} Loops Failed. {$exception->getMessage()}");
}
}
}
}