package queuefs
import (
"database/sql"
"encoding/json"
"fmt"
"strings"
"time"
log "github.com/sirupsen/logrus"
)
type SQLiteQueueBackend struct {
db *sql.DB
}
func NewSQLiteQueueBackend() *SQLiteQueueBackend {
return &SQLiteQueueBackend{}
}
func (b *SQLiteQueueBackend) Initialize(config map[string]interface{}) error {
dbBackend := NewSQLiteDBBackend()
db, err := dbBackend.Open(config)
if err != nil {
return fmt.Errorf("failed to open SQLite database: %w", err)
}
b.db = db
for _, sqlStmt := range dbBackend.GetInitSQL() {
if _, err := db.Exec(sqlStmt); err != nil {
db.Close()
return fmt.Errorf("failed to initialize schema: %w", err)
}
}
b.runMigrations()
if n, err := b.RecoverStale(0); err != nil {
log.Warnf("[queuefs] Failed to recover stale messages on startup: %v", err)
} else if n > 0 {
log.Infof("[queuefs] Recovered %d in-flight message(s) from previous run", n)
}
log.Info("[queuefs] SQLite backend initialized")
return nil
}
func (b *SQLiteQueueBackend) runMigrations() {
migrations := []string{
`ALTER TABLE queue_messages ADD COLUMN status TEXT NOT NULL DEFAULT 'pending'`,
`ALTER TABLE queue_messages ADD COLUMN processing_started_at INTEGER`,
`CREATE INDEX IF NOT EXISTS idx_queue_status ON queue_messages(queue_name, status, id)`,
`CREATE INDEX IF NOT EXISTS idx_queue_message_id ON queue_messages(queue_name, message_id)`,
}
for _, stmt := range migrations {
if _, err := b.db.Exec(stmt); err != nil {
if !strings.Contains(err.Error(), "duplicate column name") &&
!strings.Contains(err.Error(), "already exists") {
log.Warnf("[queuefs] Migration warning: %v", err)
}
}
}
}
func (b *SQLiteQueueBackend) Close() error {
if b.db != nil {
return b.db.Close()
}
return nil
}
func (b *SQLiteQueueBackend) GetType() string {
return "sqlite"
}
func (b *SQLiteQueueBackend) Enqueue(queueName string, msg QueueMessage) error {
msgData, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
_, err = b.db.Exec(
"INSERT INTO queue_messages (queue_name, message_id, data, timestamp, status) VALUES (?, ?, ?, ?, 'pending')",
queueName, msg.ID, string(msgData), msg.Timestamp.Unix(),
)
if err != nil {
return fmt.Errorf("failed to enqueue message: %w", err)
}
return nil
}
func (b *SQLiteQueueBackend) Dequeue(queueName string) (QueueMessage, bool, error) {
tx, err := b.db.Begin()
if err != nil {
return QueueMessage{}, false, fmt.Errorf("failed to start transaction: %w", err)
}
defer tx.Rollback()
var id int64
var data string
err = tx.QueryRow(
"SELECT id, data FROM queue_messages WHERE queue_name = ? AND status = 'pending' ORDER BY id LIMIT 1",
queueName,
).Scan(&id, &data)
if err == sql.ErrNoRows {
return QueueMessage{}, false, nil
} else if err != nil {
return QueueMessage{}, false, fmt.Errorf("failed to query message: %w", err)
}
_, err = tx.Exec(
"UPDATE queue_messages SET status = 'processing', processing_started_at = ? WHERE id = ?",
time.Now().Unix(), id,
)
if err != nil {
return QueueMessage{}, false, fmt.Errorf("failed to mark message as processing: %w", err)
}
if err := tx.Commit(); err != nil {
return QueueMessage{}, false, fmt.Errorf("failed to commit transaction: %w", err)
}
var msg QueueMessage
if err := json.Unmarshal([]byte(data), &msg); err != nil {
return QueueMessage{}, false, fmt.Errorf("failed to unmarshal message: %w", err)
}
return msg, true, nil
}
func (b *SQLiteQueueBackend) Ack(queueName string, messageID string) error {
result, err := b.db.Exec(
"DELETE FROM queue_messages WHERE queue_name = ? AND message_id = ? AND status = 'processing'",
queueName, messageID,
)
if err != nil {
return fmt.Errorf("failed to ack message: %w", err)
}
rows, _ := result.RowsAffected()
if rows == 0 {
log.Warnf("[queuefs] Ack found no matching processing message: queue=%s msg=%s", queueName, messageID)
}
return nil
}
func (b *SQLiteQueueBackend) RecoverStale(staleSec int64) (int, error) {
cutoff := time.Now().Unix() - staleSec
result, err := b.db.Exec(
"UPDATE queue_messages SET status = 'pending', processing_started_at = NULL WHERE status = 'processing' AND processing_started_at <= ?",
cutoff,
)
if err != nil {
return 0, fmt.Errorf("failed to recover stale messages: %w", err)
}
n, _ := result.RowsAffected()
return int(n), nil
}
func (b *SQLiteQueueBackend) Peek(queueName string) (QueueMessage, bool, error) {
var data string
err := b.db.QueryRow(
"SELECT data FROM queue_messages WHERE queue_name = ? AND status = 'pending' ORDER BY id LIMIT 1",
queueName,
).Scan(&data)
if err == sql.ErrNoRows {
return QueueMessage{}, false, nil
} else if err != nil {
return QueueMessage{}, false, fmt.Errorf("failed to peek message: %w", err)
}
var msg QueueMessage
if err := json.Unmarshal([]byte(data), &msg); err != nil {
return QueueMessage{}, false, fmt.Errorf("failed to unmarshal message: %w", err)
}
return msg, true, nil
}
func (b *SQLiteQueueBackend) Size(queueName string) (int, error) {
var count int
err := b.db.QueryRow(
"SELECT COUNT(*) FROM queue_messages WHERE queue_name = ? AND status = 'pending'",
queueName,
).Scan(&count)
if err != nil {
return 0, fmt.Errorf("failed to get queue size: %w", err)
}
return count, nil
}
func (b *SQLiteQueueBackend) Clear(queueName string) error {
_, err := b.db.Exec("DELETE FROM queue_messages WHERE queue_name = ?", queueName)
if err != nil {
return fmt.Errorf("failed to clear queue: %w", err)
}
return nil
}
func (b *SQLiteQueueBackend) ListQueues(prefix string) ([]string, error) {
var query string
var args []interface{}
if prefix == "" {
query = "SELECT queue_name FROM queue_metadata"
} else {
query = "SELECT queue_name FROM queue_metadata WHERE queue_name = ? OR queue_name LIKE ?"
args = []interface{}{prefix, prefix + "/%"}
}
rows, err := b.db.Query(query, args...)
if err != nil {
return nil, fmt.Errorf("failed to list queues: %w", err)
}
defer rows.Close()
var queues []string
for rows.Next() {
var qName string
if err := rows.Scan(&qName); err != nil {
return nil, fmt.Errorf("failed to scan queue name: %w", err)
}
queues = append(queues, qName)
}
return queues, nil
}
func (b *SQLiteQueueBackend) GetLastEnqueueTime(queueName string) (time.Time, error) {
var timestamp sql.NullInt64
err := b.db.QueryRow(
"SELECT MAX(timestamp) FROM queue_messages WHERE queue_name = ? AND status = 'pending'",
queueName,
).Scan(×tamp)
if err != nil || !timestamp.Valid {
return time.Time{}, nil
}
return time.Unix(timestamp.Int64, 0), nil
}
func (b *SQLiteQueueBackend) RemoveQueue(queueName string) error {
if queueName == "" {
if _, err := b.db.Exec("DELETE FROM queue_messages"); err != nil {
return err
}
_, err := b.db.Exec("DELETE FROM queue_metadata")
return err
}
if _, err := b.db.Exec(
"DELETE FROM queue_messages WHERE queue_name = ? OR queue_name LIKE ?",
queueName, queueName+"/%",
); err != nil {
return fmt.Errorf("failed to remove queue messages: %w", err)
}
_, err := b.db.Exec(
"DELETE FROM queue_metadata WHERE queue_name = ? OR queue_name LIKE ?",
queueName, queueName+"/%",
)
return err
}
func (b *SQLiteQueueBackend) CreateQueue(queueName string) error {
_, err := b.db.Exec(
"INSERT OR IGNORE INTO queue_metadata (queue_name) VALUES (?)",
queueName,
)
if err != nil {
return fmt.Errorf("failed to create queue: %w", err)
}
log.Infof("[queuefs] Created queue '%s' (SQLite)", queueName)
return nil
}
func (b *SQLiteQueueBackend) QueueExists(queueName string) (bool, error) {
var count int
err := b.db.QueryRow(
"SELECT COUNT(*) FROM queue_metadata WHERE queue_name = ?",
queueName,
).Scan(&count)
if err != nil {
return false, fmt.Errorf("failed to check queue existence: %w", err)
}
return count > 0, nil
}