package queuefs
import (
"database/sql"
"encoding/json"
"fmt"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
type QueueBackend interface {
Initialize(config map[string]interface{}) error
Close() error
GetType() string
Enqueue(queueName string, msg QueueMessage) error
Dequeue(queueName string) (QueueMessage, bool, error)
Ack(queueName string, messageID string) error
RecoverStale(staleSec int64) (int, error)
Peek(queueName string) (QueueMessage, bool, error)
Size(queueName string) (int, error)
Clear(queueName string) error
ListQueues(prefix string) ([]string, error)
GetLastEnqueueTime(queueName string) (time.Time, error)
RemoveQueue(queueName string) error
CreateQueue(queueName string) error
QueueExists(queueName string) (bool, error)
}
type MemoryBackend struct {
queues map[string]*Queue
}
func NewMemoryBackend() *MemoryBackend {
return &MemoryBackend{
queues: make(map[string]*Queue),
}
}
func (b *MemoryBackend) Initialize(config map[string]interface{}) error {
return nil
}
func (b *MemoryBackend) Close() error {
b.queues = nil
return nil
}
func (b *MemoryBackend) GetType() string {
return "memory"
}
func (b *MemoryBackend) getOrCreateQueue(queueName string) *Queue {
if queue, exists := b.queues[queueName]; exists {
return queue
}
queue := &Queue{
messages: []QueueMessage{},
lastEnqueueTime: time.Time{},
}
b.queues[queueName] = queue
return queue
}
func (b *MemoryBackend) Enqueue(queueName string, msg QueueMessage) error {
queue := b.getOrCreateQueue(queueName)
queue.mu.Lock()
defer queue.mu.Unlock()
queue.messages = append(queue.messages, msg)
if msg.Timestamp.After(queue.lastEnqueueTime) {
queue.lastEnqueueTime = msg.Timestamp
} else {
queue.lastEnqueueTime = queue.lastEnqueueTime.Add(1 * time.Nanosecond)
}
return nil
}
func (b *MemoryBackend) Dequeue(queueName string) (QueueMessage, bool, error) {
queue, exists := b.queues[queueName]
if !exists {
return QueueMessage{}, false, nil
}
queue.mu.Lock()
defer queue.mu.Unlock()
if len(queue.messages) == 0 {
return QueueMessage{}, false, nil
}
msg := queue.messages[0]
queue.messages = queue.messages[1:]
return msg, true, nil
}
func (b *MemoryBackend) Ack(queueName string, messageID string) error {
return nil
}
func (b *MemoryBackend) RecoverStale(staleSec int64) (int, error) {
return 0, nil
}
func (b *MemoryBackend) Peek(queueName string) (QueueMessage, bool, error) {
queue, exists := b.queues[queueName]
if !exists {
return QueueMessage{}, false, nil
}
queue.mu.Lock()
defer queue.mu.Unlock()
if len(queue.messages) == 0 {
return QueueMessage{}, false, nil
}
return queue.messages[0], true, nil
}
func (b *MemoryBackend) Size(queueName string) (int, error) {
queue, exists := b.queues[queueName]
if !exists {
return 0, nil
}
queue.mu.Lock()
defer queue.mu.Unlock()
return len(queue.messages), nil
}
func (b *MemoryBackend) Clear(queueName string) error {
queue, exists := b.queues[queueName]
if !exists {
return nil
}
queue.mu.Lock()
defer queue.mu.Unlock()
queue.messages = []QueueMessage{}
queue.lastEnqueueTime = time.Time{}
return nil
}
func (b *MemoryBackend) ListQueues(prefix string) ([]string, error) {
var queues []string
for qName := range b.queues {
if prefix == "" || qName == prefix || len(qName) > len(prefix) && qName[:len(prefix)+1] == prefix+"/" {
queues = append(queues, qName)
}
}
return queues, nil
}
func (b *MemoryBackend) GetLastEnqueueTime(queueName string) (time.Time, error) {
queue, exists := b.queues[queueName]
if !exists {
return time.Time{}, nil
}
queue.mu.Lock()
defer queue.mu.Unlock()
return queue.lastEnqueueTime, nil
}
func (b *MemoryBackend) RemoveQueue(queueName string) error {
if queueName == "" {
b.queues = make(map[string]*Queue)
return nil
}
delete(b.queues, queueName)
prefix := queueName + "/"
for qName := range b.queues {
if len(qName) > len(prefix) && qName[:len(prefix)] == prefix {
delete(b.queues, qName)
}
}
return nil
}
func (b *MemoryBackend) CreateQueue(queueName string) error {
b.getOrCreateQueue(queueName)
return nil
}
func (b *MemoryBackend) QueueExists(queueName string) (bool, error) {
_, exists := b.queues[queueName]
return exists, nil
}
type TiDBBackend struct {
db *sql.DB
backend DBBackend
backendType string
tableCache map[string]string
cacheMu sync.RWMutex
}
func NewTiDBBackend() *TiDBBackend {
return &TiDBBackend{
tableCache: make(map[string]string),
}
}
func (b *TiDBBackend) Initialize(config map[string]interface{}) error {
backendType := "memory"
if val, ok := config["backend"]; ok {
if strVal, ok := val.(string); ok {
backendType = strVal
}
}
b.backendType = backendType
backend, err := CreateBackend(config)
if err != nil {
return fmt.Errorf("failed to create backend: %w", err)
}
b.backend = backend
db, err := backend.Open(config)
if err != nil {
return fmt.Errorf("failed to open database: %w", err)
}
b.db = db
for _, sqlStmt := range backend.GetInitSQL() {
if _, err := db.Exec(sqlStmt); err != nil {
db.Close()
return fmt.Errorf("failed to initialize schema: %w", err)
}
}
return nil
}
func (b *TiDBBackend) Close() error {
if b.db != nil {
return b.db.Close()
}
return nil
}
func (b *TiDBBackend) GetType() string {
return b.backendType
}
func (b *TiDBBackend) getTableName(queueName string, forceRefresh bool) (string, error) {
if !forceRefresh {
b.cacheMu.RLock()
if tableName, exists := b.tableCache[queueName]; exists {
b.cacheMu.RUnlock()
return tableName, nil
}
b.cacheMu.RUnlock()
}
var tableName string
err := b.db.QueryRow(
"SELECT table_name FROM queuefs_registry WHERE queue_name = ?",
queueName,
).Scan(&tableName)
if err != nil {
return "", err
}
b.cacheMu.Lock()
b.tableCache[queueName] = tableName
b.cacheMu.Unlock()
return tableName, nil
}
func (b *TiDBBackend) invalidateCache(queueName string) {
b.cacheMu.Lock()
delete(b.tableCache, queueName)
b.cacheMu.Unlock()
}
func (b *TiDBBackend) Enqueue(queueName string, msg QueueMessage) error {
msgData, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
tableName, err := b.getTableName(queueName, false)
if err == sql.ErrNoRows {
return fmt.Errorf("queue does not exist: %s (create it with mkdir first)", queueName)
} else if err != nil {
return fmt.Errorf("failed to get queue table name: %w", err)
}
insertSQL := fmt.Sprintf(
"INSERT INTO %s (message_id, data, timestamp, deleted) VALUES (?, ?, ?, 0)",
tableName,
)
_, err = b.db.Exec(insertSQL, msg.ID, string(msgData), msg.Timestamp.Unix())
if err != nil {
return fmt.Errorf("failed to enqueue message: %w", err)
}
return nil
}
func (b *TiDBBackend) Ack(queueName string, messageID string) error {
return nil
}
func (b *TiDBBackend) RecoverStale(staleSec int64) (int, error) {
return 0, nil
}
func (b *TiDBBackend) Dequeue(queueName string) (QueueMessage, bool, error) {
tableName, err := b.getTableName(queueName, false)
if err == sql.ErrNoRows {
return QueueMessage{}, false, nil
} else if err != nil {
return QueueMessage{}, false, fmt.Errorf("failed to get queue table name: %w", err)
}
tx, err := b.db.Begin()
if err != nil {
return QueueMessage{}, false, fmt.Errorf("failed to start transaction: %w", err)
}
defer tx.Rollback()
var id int64
var data string
querySQL := fmt.Sprintf(
"SELECT id, data FROM %s WHERE deleted = 0 ORDER BY id LIMIT 1 FOR UPDATE SKIP LOCKED",
tableName,
)
err = tx.QueryRow(querySQL).Scan(&id, &data)
if err == sql.ErrNoRows {
return QueueMessage{}, false, nil
} else if err != nil {
return QueueMessage{}, false, fmt.Errorf("failed to query message: %w", err)
}
updateSQL := fmt.Sprintf(
"UPDATE %s SET deleted = 1, deleted_at = CURRENT_TIMESTAMP WHERE id = ?",
tableName,
)
_, err = tx.Exec(updateSQL, id)
if err != nil {
return QueueMessage{}, false, fmt.Errorf("failed to mark message as deleted: %w", err)
}
if err := tx.Commit(); err != nil {
return QueueMessage{}, false, fmt.Errorf("failed to commit transaction: %w", err)
}
var msg QueueMessage
if err := json.Unmarshal([]byte(data), &msg); err != nil {
return QueueMessage{}, false, fmt.Errorf("failed to unmarshal message: %w", err)
}
return msg, true, nil
}
func (b *TiDBBackend) Peek(queueName string) (QueueMessage, bool, error) {
tableName, err := b.getTableName(queueName, false)
if err == sql.ErrNoRows {
return QueueMessage{}, false, nil
} else if err != nil {
return QueueMessage{}, false, fmt.Errorf("failed to get queue table name: %w", err)
}
var data string
querySQL := fmt.Sprintf(
"SELECT data FROM %s WHERE deleted = 0 ORDER BY id LIMIT 1",
tableName,
)
err = b.db.QueryRow(querySQL).Scan(&data)
if err == sql.ErrNoRows {
return QueueMessage{}, false, nil
} else if err != nil {
return QueueMessage{}, false, fmt.Errorf("failed to peek message: %w", err)
}
var msg QueueMessage
if err := json.Unmarshal([]byte(data), &msg); err != nil {
return QueueMessage{}, false, fmt.Errorf("failed to unmarshal message: %w", err)
}
return msg, true, nil
}
func (b *TiDBBackend) Size(queueName string) (int, error) {
tableName, err := b.getTableName(queueName, false)
if err == sql.ErrNoRows {
return 0, nil
} else if err != nil {
return 0, fmt.Errorf("failed to get queue table name: %w", err)
}
var count int
querySQL := fmt.Sprintf(
"SELECT COUNT(*) FROM %s WHERE deleted = 0",
tableName,
)
err = b.db.QueryRow(querySQL).Scan(&count)
if err != nil {
return 0, fmt.Errorf("failed to get queue size: %w", err)
}
return count, nil
}
func (b *TiDBBackend) Clear(queueName string) error {
tableName, err := b.getTableName(queueName, false)
if err == sql.ErrNoRows {
return nil
} else if err != nil {
return fmt.Errorf("failed to get queue table name: %w", err)
}
deleteSQL := fmt.Sprintf("DELETE FROM %s", tableName)
_, err = b.db.Exec(deleteSQL)
if err != nil {
return fmt.Errorf("failed to clear queue: %w", err)
}
return nil
}
func (b *TiDBBackend) ListQueues(prefix string) ([]string, error) {
var query string
var args []interface{}
if prefix == "" {
query = "SELECT queue_name FROM queuefs_registry"
} else {
query = "SELECT queue_name FROM queuefs_registry WHERE queue_name = ? OR queue_name LIKE ?"
args = []interface{}{prefix, prefix + "/%"}
}
rows, err := b.db.Query(query, args...)
if err != nil {
return nil, fmt.Errorf("failed to list queues: %w", err)
}
defer rows.Close()
var queues []string
for rows.Next() {
var qName string
if err := rows.Scan(&qName); err != nil {
return nil, fmt.Errorf("failed to scan queue name: %w", err)
}
queues = append(queues, qName)
}
return queues, nil
}
func (b *TiDBBackend) GetLastEnqueueTime(queueName string) (time.Time, error) {
tableName, err := b.getTableName(queueName, false)
if err == sql.ErrNoRows {
return time.Time{}, nil
} else if err != nil {
return time.Time{}, fmt.Errorf("failed to get queue table name: %w", err)
}
var timestamp int64
querySQL := fmt.Sprintf(
"SELECT MAX(timestamp) FROM %s WHERE deleted = 0",
tableName,
)
err = b.db.QueryRow(querySQL).Scan(×tamp)
if err == sql.ErrNoRows || timestamp == 0 {
return time.Time{}, nil
} else if err != nil {
return time.Time{}, fmt.Errorf("failed to get last enqueue time: %w", err)
}
return time.Unix(timestamp, 0), nil
}
func (b *TiDBBackend) RemoveQueue(queueName string) error {
if queueName == "" {
rows, err := b.db.Query("SELECT queue_name, table_name FROM queuefs_registry")
if err != nil {
return fmt.Errorf("failed to list queues: %w", err)
}
defer rows.Close()
var queuesToDelete []struct {
queueName string
tableName string
}
for rows.Next() {
var qName, tName string
if err := rows.Scan(&qName, &tName); err != nil {
return fmt.Errorf("failed to scan queue: %w", err)
}
queuesToDelete = append(queuesToDelete, struct {
queueName string
tableName string
}{qName, tName})
}
for _, q := range queuesToDelete {
dropSQL := fmt.Sprintf("DROP TABLE IF EXISTS %s", q.tableName)
if _, err := b.db.Exec(dropSQL); err != nil {
log.Warnf("[queuefs] Failed to drop table '%s': %v", q.tableName, err)
}
}
b.cacheMu.Lock()
b.tableCache = make(map[string]string)
b.cacheMu.Unlock()
_, err = b.db.Exec("DELETE FROM queuefs_registry")
return err
}
rows, err := b.db.Query(
"SELECT queue_name, table_name FROM queuefs_registry WHERE queue_name = ? OR queue_name LIKE ?",
queueName, queueName+"/%",
)
if err != nil {
return fmt.Errorf("failed to query queues: %w", err)
}
defer rows.Close()
var queuesToDelete []struct {
queueName string
tableName string
}
for rows.Next() {
var qName, tName string
if err := rows.Scan(&qName, &tName); err != nil {
return fmt.Errorf("failed to scan queue: %w", err)
}
queuesToDelete = append(queuesToDelete, struct {
queueName string
tableName string
}{qName, tName})
}
for _, q := range queuesToDelete {
dropSQL := fmt.Sprintf("DROP TABLE IF EXISTS %s", q.tableName)
if _, err := b.db.Exec(dropSQL); err != nil {
log.Warnf("[queuefs] Failed to drop table '%s': %v", q.tableName, err)
} else {
log.Infof("[queuefs] Dropped queue table '%s' for queue '%s'", q.tableName, q.queueName)
}
b.invalidateCache(q.queueName)
}
_, err = b.db.Exec(
"DELETE FROM queuefs_registry WHERE queue_name = ? OR queue_name LIKE ?",
queueName, queueName+"/%",
)
return err
}
func (b *TiDBBackend) CreateQueue(queueName string) error {
tableName := sanitizeTableName(queueName)
createTableSQL := getCreateTableSQL(tableName)
if _, err := b.db.Exec(createTableSQL); err != nil {
return fmt.Errorf("failed to create queue table: %w", err)
}
_, err := b.db.Exec(
"INSERT IGNORE INTO queuefs_registry (queue_name, table_name) VALUES (?, ?)",
queueName, tableName,
)
if err != nil {
return fmt.Errorf("failed to register queue: %w", err)
}
b.cacheMu.Lock()
b.tableCache[queueName] = tableName
b.cacheMu.Unlock()
log.Infof("[queuefs] Created queue table '%s' for queue '%s'", tableName, queueName)
return nil
}
func (b *TiDBBackend) QueueExists(queueName string) (bool, error) {
b.cacheMu.RLock()
_, exists := b.tableCache[queueName]
b.cacheMu.RUnlock()
if exists {
return true, nil
}
var count int
err := b.db.QueryRow(
"SELECT COUNT(*) FROM queuefs_registry WHERE queue_name = ?",
queueName,
).Scan(&count)
if err != nil {
return false, fmt.Errorf("failed to check queue existence: %w", err)
}
return count > 0, nil
}