e497a93d创建于 2025年8月25日历史提交
<?php

/**
 * @package    Grav\Common\Scheduler
 *
 * @copyright  Copyright (c) 2015 - 2025 Trilby Media, LLC. All rights reserved.
 * @license    MIT License; see LICENSE file for details.
 */

namespace Grav\Common\Scheduler;

use RocketTheme\Toolbox\File\JsonFile;
use RuntimeException;

/**
 * File-based job queue implementation
 * 
 * @package Grav\Common\Scheduler
 */
class JobQueue
{
    /** @var string */
    protected $queuePath;
    
    /** @var string */
    protected $lockFile;
    
    /** @var array Priority levels */
    const PRIORITY_HIGH = 'high';
    const PRIORITY_NORMAL = 'normal';
    const PRIORITY_LOW = 'low';
    
    /**
     * JobQueue constructor
     * 
     * @param string $queuePath
     */
    public function __construct(string $queuePath)
    {
        $this->queuePath = $queuePath;
        $this->lockFile = $queuePath . '/.lock';
        
        // Create queue directories
        $this->initializeDirectories();
    }
    
    /**
     * Initialize queue directories
     * 
     * @return void
     */
    protected function initializeDirectories(): void
    {
        $dirs = [
            $this->queuePath . '/pending',
            $this->queuePath . '/processing',
            $this->queuePath . '/failed',
            $this->queuePath . '/completed',
        ];
        
        foreach ($dirs as $dir) {
            if (!file_exists($dir)) {
                mkdir($dir, 0755, true);
            }
        }
    }
    
    /**
     * Push a job to the queue
     * 
     * @param Job $job
     * @param string $priority
     * @return string Job queue ID
     */
    public function push(Job $job, string $priority = self::PRIORITY_NORMAL): string
    {
        $queueId = $this->generateQueueId($job);
        $timestamp = microtime(true);
        
        $queueItem = [
            'id' => $queueId,
            'job_id' => $job->getId(),
            'command' => is_string($job->getCommand()) ? $job->getCommand() : 'Closure',
            'arguments' => method_exists($job, 'getRawArguments') ? $job->getRawArguments() : $job->getArguments(),
            'priority' => $priority,
            'timestamp' => $timestamp,
            'attempts' => 0,
            'max_attempts' => method_exists($job, 'getMaxAttempts') ? $job->getMaxAttempts() : 1,
            'created_at' => date('c'),
            'scheduled_for' => null,
            'metadata' => [],
        ];
        
        // Always serialize the job to preserve its full state
        $queueItem['serialized_job'] = base64_encode(serialize($job));
        
        $this->writeQueueItem($queueItem, 'pending');
        
        return $queueId;
    }
    
    /**
     * Push a job for delayed execution
     * 
     * @param Job $job
     * @param \DateTime $scheduledFor
     * @param string $priority
     * @return string
     */
    public function pushDelayed(Job $job, \DateTime $scheduledFor, string $priority = self::PRIORITY_NORMAL): string
    {
        $queueId = $this->push($job, $priority);
        
        // Update the scheduled time
        $item = $this->getQueueItem($queueId, 'pending');
        if ($item) {
            $item['scheduled_for'] = $scheduledFor->format('c');
            $this->writeQueueItem($item, 'pending');
        }
        
        return $queueId;
    }
    
    /**
     * Pop the next job from the queue
     * 
     * @return Job|null
     */
    public function pop(): ?Job
    {
        if (!$this->lock()) {
            return null;
        }
        
        try {
            // Get all pending items
            $items = $this->getPendingItems();
            
            if (empty($items)) {
                $this->unlock();
                return null;
            }
            
            // Sort by priority and timestamp
            usort($items, function($a, $b) {
                $priorityOrder = [
                    self::PRIORITY_HIGH => 0,
                    self::PRIORITY_NORMAL => 1,
                    self::PRIORITY_LOW => 2,
                ];
                
                $aPriority = $priorityOrder[$a['priority']] ?? 1;
                $bPriority = $priorityOrder[$b['priority']] ?? 1;
                
                if ($aPriority !== $bPriority) {
                    return $aPriority - $bPriority;
                }
                
                return $a['timestamp'] <=> $b['timestamp'];
            });
            
            // Get the first item that's ready to run
            $now = new \DateTime();
            foreach ($items as $item) {
                if ($item['scheduled_for']) {
                    $scheduledTime = new \DateTime($item['scheduled_for']);
                    if ($scheduledTime > $now) {
                        continue; // Skip items not yet due
                    }
                }
                
                // Move to processing
                $this->moveQueueItem($item['id'], 'pending', 'processing');
                
                // Reconstruct the job
                $job = $this->reconstructJob($item);
                
                $this->unlock();
                return $job;
            }
            
            $this->unlock();
            return null;
            
        } catch (\Exception $e) {
            $this->unlock();
            throw $e;
        }
    }
    
    /**
     * Pop a job from the queue with its queue ID
     * 
     * @return array|null Array with 'job' and 'id' keys
     */
    public function popWithId(): ?array
    {
        if (!$this->lock()) {
            return null;
        }
        
        try {
            // Get all pending items
            $items = $this->getPendingItems();
            
            if (empty($items)) {
                $this->unlock();
                return null;
            }
            
            // Sort by priority and timestamp
            usort($items, function($a, $b) {
                $priorityOrder = [
                    self::PRIORITY_HIGH => 0,
                    self::PRIORITY_NORMAL => 1,
                    self::PRIORITY_LOW => 2,
                ];
                
                $aPriority = $priorityOrder[$a['priority']] ?? 1;
                $bPriority = $priorityOrder[$b['priority']] ?? 1;
                
                if ($aPriority !== $bPriority) {
                    return $aPriority - $bPriority;
                }
                
                return $a['timestamp'] <=> $b['timestamp'];
            });
            
            // Get the first item that's ready to run
            $now = new \DateTime();
            foreach ($items as $item) {
                if ($item['scheduled_for']) {
                    $scheduledTime = new \DateTime($item['scheduled_for']);
                    if ($scheduledTime > $now) {
                        continue; // Skip items not yet due
                    }
                }
                
                // Reconstruct the job first before moving it
                $job = $this->reconstructJob($item);
                
                if (!$job) {
                    // Failed to reconstruct, skip this item
                    continue;
                }
                
                // Move to processing only if we can reconstruct the job
                $this->moveQueueItem($item['id'], 'pending', 'processing');
                
                $this->unlock();
                return ['job' => $job, 'id' => $item['id']];
            }
            
            $this->unlock();
            return null;
            
        } catch (\Exception $e) {
            $this->unlock();
            throw $e;
        }
    }
    
    /**
     * Mark a job as completed
     * 
     * @param string $queueId
     * @return void
     */
    public function complete(string $queueId): void
    {
        $this->moveQueueItem($queueId, 'processing', 'completed');
        
        // Clean up old completed items
        $this->cleanupCompleted();
    }
    
    /**
     * Mark a job as failed
     * 
     * @param string $queueId
     * @param string $error
     * @return void
     */
    public function fail(string $queueId, string $error = ''): void
    {
        $item = $this->getQueueItem($queueId, 'processing');
        
        if ($item) {
            $item['attempts']++;
            $item['last_error'] = $error;
            $item['failed_at'] = date('c');
            
            if ($item['attempts'] < $item['max_attempts']) {
                // Move back to pending for retry
                $item['retry_at'] = $this->calculateRetryTime($item['attempts']);
                $item['scheduled_for'] = $item['retry_at'];
                $this->writeQueueItem($item, 'pending');
                $this->deleteQueueItem($queueId, 'processing');
            } else {
                // Move to failed (dead letter queue)
                $this->writeQueueItem($item, 'failed');
                $this->deleteQueueItem($queueId, 'processing');
            }
        }
    }
    
    /**
     * Get queue size
     * 
     * @return int
     */
    public function size(): int
    {
        return count($this->getPendingItems());
    }
    
    /**
     * Check if queue is empty
     * 
     * @return bool
     */
    public function isEmpty(): bool
    {
        return $this->size() === 0;
    }
    
    /**
     * Get queue statistics
     * 
     * @return array
     */
    public function getStatistics(): array
    {
        return [
            'pending' => count($this->getPendingItems()),
            'processing' => count($this->getItemsInDirectory('processing')),
            'failed' => count($this->getItemsInDirectory('failed')),
            'completed_today' => $this->countCompletedToday(),
        ];
    }
    
    /**
     * Generate a unique queue ID
     * 
     * @param Job $job
     * @return string
     */
    protected function generateQueueId(Job $job): string
    {
        return $job->getId() . '_' . uniqid('', true);
    }
    
    /**
     * Write queue item to disk
     * 
     * @param array $item
     * @param string $directory
     * @return void
     */
    protected function writeQueueItem(array $item, string $directory): void
    {
        $path = $this->queuePath . '/' . $directory . '/' . $item['id'] . '.json';
        $file = JsonFile::instance($path);
        $file->save($item);
    }
    
    /**
     * Read queue item from disk
     * 
     * @param string $queueId
     * @param string $directory
     * @return array|null
     */
    protected function getQueueItem(string $queueId, string $directory): ?array
    {
        $path = $this->queuePath . '/' . $directory . '/' . $queueId . '.json';
        
        if (!file_exists($path)) {
            return null;
        }
        
        $file = JsonFile::instance($path);
        return $file->content();
    }
    
    /**
     * Delete queue item
     * 
     * @param string $queueId
     * @param string $directory
     * @return void
     */
    protected function deleteQueueItem(string $queueId, string $directory): void
    {
        $path = $this->queuePath . '/' . $directory . '/' . $queueId . '.json';
        
        if (file_exists($path)) {
            unlink($path);
        }
    }
    
    /**
     * Move queue item between directories
     * 
     * @param string $queueId
     * @param string $fromDir
     * @param string $toDir
     * @return void
     */
    protected function moveQueueItem(string $queueId, string $fromDir, string $toDir): void
    {
        $fromPath = $this->queuePath . '/' . $fromDir . '/' . $queueId . '.json';
        $toPath = $this->queuePath . '/' . $toDir . '/' . $queueId . '.json';
        
        if (file_exists($fromPath)) {
            rename($fromPath, $toPath);
        }
    }
    
    /**
     * Get all pending items
     * 
     * @return array
     */
    protected function getPendingItems(): array
    {
        return $this->getItemsInDirectory('pending');
    }
    
    /**
     * Get items in a specific directory
     * 
     * @param string $directory
     * @return array
     */
    protected function getItemsInDirectory(string $directory): array
    {
        $items = [];
        $path = $this->queuePath . '/' . $directory;
        
        if (!is_dir($path)) {
            return $items;
        }
        
        $files = glob($path . '/*.json');
        foreach ($files as $file) {
            $jsonFile = JsonFile::instance($file);
            $items[] = $jsonFile->content();
        }
        
        return $items;
    }
    
    /**
     * Reconstruct a job from queue item
     * 
     * @param array $item
     * @return Job|null
     */
    protected function reconstructJob(array $item): ?Job
    {
        if (isset($item['serialized_job'])) {
            // Unserialize the job
            try {
                $job = unserialize(base64_decode($item['serialized_job']));
                if ($job instanceof Job) {
                    return $job;
                }
            } catch (\Exception $e) {
                // Failed to unserialize
                return null;
            }
        }
        
        // Create a new job from command
        if (isset($item['command'])) {
            $args = $item['arguments'] ?? [];
            $job = new Job($item['command'], $args, $item['job_id']);
            return $job;
        }
        
        return null;
    }
    
    /**
     * Calculate retry time with exponential backoff
     * 
     * @param int $attempts
     * @return string
     */
    protected function calculateRetryTime(int $attempts): string
    {
        $backoffSeconds = min(pow(2, $attempts) * 60, 3600); // Max 1 hour
        $retryTime = new \DateTime();
        $retryTime->modify("+{$backoffSeconds} seconds");
        return $retryTime->format('c');
    }
    
    /**
     * Clean up old completed items
     * 
     * @return void
     */
    protected function cleanupCompleted(): void
    {
        $items = $this->getItemsInDirectory('completed');
        $cutoff = new \DateTime('-24 hours');
        
        foreach ($items as $item) {
            if (isset($item['created_at'])) {
                $createdAt = new \DateTime($item['created_at']);
                if ($createdAt < $cutoff) {
                    $this->deleteQueueItem($item['id'], 'completed');
                }
            }
        }
    }
    
    /**
     * Count completed jobs today
     * 
     * @return int
     */
    protected function countCompletedToday(): int
    {
        $items = $this->getItemsInDirectory('completed');
        $today = new \DateTime('today');
        $count = 0;
        
        foreach ($items as $item) {
            if (isset($item['created_at'])) {
                $createdAt = new \DateTime($item['created_at']);
                if ($createdAt >= $today) {
                    $count++;
                }
            }
        }
        
        return $count;
    }
    
    /**
     * Acquire lock for queue operations
     * 
     * @return bool
     */
    protected function lock(): bool
    {
        $attempts = 0;
        $maxAttempts = 50; // 5 seconds total
        
        while ($attempts < $maxAttempts) {
            // Check if lock file exists and is stale (older than 30 seconds)
            if (file_exists($this->lockFile)) {
                $lockAge = time() - filemtime($this->lockFile);
                if ($lockAge > 30) {
                    // Stale lock, remove it
                    @unlink($this->lockFile);
                }
            }
            
            // Try to acquire lock atomically
            $handle = @fopen($this->lockFile, 'x');
            if ($handle !== false) {
                fclose($handle);
                return true;
            }
            
            $attempts++;
            usleep(100000); // 100ms
        }
        
        // Could not acquire lock
        return false;
    }
    
    /**
     * Release queue lock
     * 
     * @return void
     */
    protected function unlock(): void
    {
        if (file_exists($this->lockFile)) {
            unlink($this->lockFile);
        }
    }
}