package api
import (
"context"
"fmt"
"sync"
"time"
"github.com/c4pt0r/agfs/agfs-server/pkg/filesystem"
log "github.com/sirupsen/logrus"
"github.com/tetratelabs/wazero"
wazeroapi "github.com/tetratelabs/wazero/api"
)
type PoolConfig struct {
MaxInstances int
InstanceMaxLifetime time.Duration
InstanceMaxRequests int64
HealthCheckInterval time.Duration
AcquireTimeout time.Duration
EnableStatistics bool
}
type WASMInstancePool struct {
ctx context.Context
runtime wazero.Runtime
compiledModule wazero.CompiledModule
hostFS filesystem.FileSystem
pluginName string
config PoolConfig
instances chan *WASMModuleInstance
currentInstances int
mu sync.Mutex
stats PoolStats
closed bool
}
type PoolStats struct {
TotalCreated int64
TotalDestroyed int64
CurrentActive int64
TotalWaits int64
TotalRequests int64
FailedRequests int64
mu sync.Mutex
}
type SharedBufferInfo struct {
InputBufferPtr uint32
OutputBufferPtr uint32
BufferSize uint32
Enabled bool
}
type WASMModuleInstance struct {
module wazeroapi.Module
fileSystem *WASMFileSystem
sharedBuffer SharedBufferInfo
createdAt time.Time
requestCount int64
mu sync.Mutex
}
func NewWASMInstancePool(ctx context.Context, runtime wazero.Runtime, compiledModule wazero.CompiledModule,
pluginName string, config PoolConfig, hostFS filesystem.FileSystem) *WASMInstancePool {
if config.MaxInstances <= 0 {
config.MaxInstances = 10
}
if config.AcquireTimeout == 0 {
config.AcquireTimeout = 30 * time.Second
}
pool := &WASMInstancePool{
ctx: ctx,
runtime: runtime,
compiledModule: compiledModule,
hostFS: hostFS,
pluginName: pluginName,
config: config,
instances: make(chan *WASMModuleInstance, config.MaxInstances),
}
log.Infof("Created WASM instance pool for %s (max_instances=%d, max_lifetime=%v, max_requests=%d)",
pluginName, config.MaxInstances, config.InstanceMaxLifetime, config.InstanceMaxRequests)
if config.HealthCheckInterval > 0 {
go pool.healthCheckLoop()
}
return pool
}
func (p *WASMInstancePool) healthCheckLoop() {
ticker := time.NewTicker(p.config.HealthCheckInterval)
defer ticker.Stop()
for {
select {
case <-p.ctx.Done():
return
case <-ticker.C:
p.performHealthCheck()
}
}
}
func (p *WASMInstancePool) performHealthCheck() {
p.mu.Lock()
closed := p.closed
p.mu.Unlock()
if closed {
return
}
log.Debugf("[Pool %s] Health check: active instances=%d/%d",
p.pluginName, p.currentInstances, p.config.MaxInstances)
}
func (p *WASMInstancePool) Acquire() (*WASMModuleInstance, error) {
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return nil, fmt.Errorf("instance pool is closed")
}
p.mu.Unlock()
if p.config.EnableStatistics {
p.stats.mu.Lock()
p.stats.TotalRequests++
p.stats.mu.Unlock()
}
select {
case instance := <-p.instances:
if p.shouldRecycleInstance(instance) {
log.Debugf("Recycling expired WASM instance for %s", p.pluginName)
p.destroyInstance(instance)
p.mu.Lock()
p.currentInstances--
p.mu.Unlock()
if p.config.EnableStatistics {
p.stats.mu.Lock()
p.stats.TotalDestroyed++
p.stats.CurrentActive--
p.stats.mu.Unlock()
}
return p.Acquire()
}
log.Debugf("Reusing WASM instance from pool for %s", p.pluginName)
instance.mu.Lock()
instance.requestCount++
instance.mu.Unlock()
return instance, nil
default:
p.mu.Lock()
canCreate := p.currentInstances < p.config.MaxInstances
if canCreate {
p.currentInstances++
}
p.mu.Unlock()
if canCreate {
instance, err := p.createInstance()
if err != nil {
p.mu.Lock()
p.currentInstances--
p.mu.Unlock()
if p.config.EnableStatistics {
p.stats.mu.Lock()
p.stats.FailedRequests++
p.stats.mu.Unlock()
}
return nil, err
}
if p.config.EnableStatistics {
p.stats.mu.Lock()
p.stats.TotalCreated++
p.stats.CurrentActive++
p.stats.mu.Unlock()
}
log.Debugf("Created new WASM instance for %s (total: %d/%d)",
p.pluginName, p.currentInstances, p.config.MaxInstances)
instance.mu.Lock()
instance.requestCount++
instance.mu.Unlock()
return instance, nil
}
log.Debugf("WASM pool full for %s, waiting for available instance...", p.pluginName)
if p.config.EnableStatistics {
p.stats.mu.Lock()
p.stats.TotalWaits++
p.stats.mu.Unlock()
}
var instance *WASMModuleInstance
select {
case instance = <-p.instances:
case <-time.After(p.config.AcquireTimeout):
if p.config.EnableStatistics {
p.stats.mu.Lock()
p.stats.FailedRequests++
p.stats.mu.Unlock()
}
return nil, fmt.Errorf("timeout waiting for available WASM instance after %v", p.config.AcquireTimeout)
}
if p.shouldRecycleInstance(instance) {
log.Debugf("Recycling expired WASM instance for %s", p.pluginName)
p.destroyInstance(instance)
p.mu.Lock()
p.currentInstances--
p.mu.Unlock()
if p.config.EnableStatistics {
p.stats.mu.Lock()
p.stats.TotalDestroyed++
p.stats.CurrentActive--
p.stats.mu.Unlock()
}
return p.Acquire()
}
instance.mu.Lock()
instance.requestCount++
instance.mu.Unlock()
return instance, nil
}
}
func (p *WASMInstancePool) shouldRecycleInstance(instance *WASMModuleInstance) bool {
instance.mu.Lock()
defer instance.mu.Unlock()
if p.config.InstanceMaxLifetime > 0 {
age := time.Since(instance.createdAt)
if age > p.config.InstanceMaxLifetime {
log.Debugf("Instance exceeded max lifetime: %v > %v", age, p.config.InstanceMaxLifetime)
return true
}
}
if p.config.InstanceMaxRequests > 0 && instance.requestCount >= p.config.InstanceMaxRequests {
log.Debugf("Instance exceeded max requests: %d >= %d", instance.requestCount, p.config.InstanceMaxRequests)
return true
}
return false
}
func (p *WASMInstancePool) Release(instance *WASMModuleInstance) {
if instance == nil {
return
}
select {
case p.instances <- instance:
log.Debugf("Returned WASM instance to pool for %s", p.pluginName)
default:
log.Debugf("Pool full, destroying excess WASM instance for %s", p.pluginName)
p.destroyInstance(instance)
p.mu.Lock()
p.currentInstances--
p.mu.Unlock()
p.stats.mu.Lock()
p.stats.TotalDestroyed++
p.stats.CurrentActive--
p.stats.mu.Unlock()
}
}
func (p *WASMInstancePool) createInstance() (*WASMModuleInstance, error) {
module, err := p.runtime.InstantiateModule(p.ctx, p.compiledModule, wazero.NewModuleConfig())
if err != nil {
return nil, fmt.Errorf("failed to instantiate WASM module: %w", err)
}
if newFunc := module.ExportedFunction("plugin_new"); newFunc != nil {
if _, err := newFunc.Call(p.ctx); err != nil {
module.Close(p.ctx)
return nil, fmt.Errorf("failed to call plugin_new: %w", err)
}
}
sharedBuffer := initializeSharedBuffer(module, p.ctx)
instance := &WASMModuleInstance{
module: module,
createdAt: time.Now(),
sharedBuffer: sharedBuffer,
fileSystem: &WASMFileSystem{
ctx: p.ctx,
module: module,
sharedBuffer: &sharedBuffer,
mu: nil,
},
}
if sharedBuffer.Enabled {
log.Debugf("Shared buffers enabled for %s: input=%d, output=%d, size=%d",
p.pluginName, sharedBuffer.InputBufferPtr, sharedBuffer.OutputBufferPtr, sharedBuffer.BufferSize)
}
return instance, nil
}
func initializeSharedBuffer(module wazeroapi.Module, ctx context.Context) SharedBufferInfo {
info := SharedBufferInfo{Enabled: false}
getInputBufFunc := module.ExportedFunction("get_input_buffer_ptr")
getOutputBufFunc := module.ExportedFunction("get_output_buffer_ptr")
getBufSizeFunc := module.ExportedFunction("get_shared_buffer_size")
if getInputBufFunc == nil || getOutputBufFunc == nil || getBufSizeFunc == nil {
log.Debug("Shared buffers not available (functions not exported)")
return info
}
inputResults, err := getInputBufFunc.Call(ctx)
if err != nil || len(inputResults) == 0 {
log.Warnf("Failed to get input buffer pointer: %v", err)
return info
}
outputResults, err := getOutputBufFunc.Call(ctx)
if err != nil || len(outputResults) == 0 {
log.Warnf("Failed to get output buffer pointer: %v", err)
return info
}
sizeResults, err := getBufSizeFunc.Call(ctx)
if err != nil || len(sizeResults) == 0 {
log.Warnf("Failed to get buffer size: %v", err)
return info
}
info.InputBufferPtr = uint32(inputResults[0])
info.OutputBufferPtr = uint32(outputResults[0])
info.BufferSize = uint32(sizeResults[0])
info.Enabled = true
return info
}
func (p *WASMInstancePool) destroyInstance(instance *WASMModuleInstance) {
if instance == nil || instance.module == nil {
return
}
if shutdownFunc := instance.module.ExportedFunction("plugin_shutdown"); shutdownFunc != nil {
shutdownFunc.Call(p.ctx)
}
instance.module.Close(p.ctx)
}
func (p *WASMInstancePool) Close() error {
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return nil
}
p.closed = true
p.mu.Unlock()
close(p.instances)
for instance := range p.instances {
p.destroyInstance(instance)
}
log.Infof("Closed WASM instance pool for %s", p.pluginName)
return nil
}
func (p *WASMInstancePool) GetStats() PoolStats {
p.stats.mu.Lock()
defer p.stats.mu.Unlock()
return p.stats
}
func (p *WASMInstancePool) Execute(fn func(*WASMModuleInstance) error) error {
instance, err := p.Acquire()
if err != nil {
return err
}
defer p.Release(instance)
return fn(instance)
}
func (p *WASMInstancePool) ExecuteFS(fn func(filesystem.FileSystem) error) error {
instance, err := p.Acquire()
if err != nil {
return err
}
defer p.Release(instance)
return fn(instance.fileSystem)
}