package mountablefs
import (
"fmt"
"io"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/c4pt0r/agfs/agfs-server/pkg/filesystem"
"github.com/c4pt0r/agfs/agfs-server/pkg/plugin"
"github.com/c4pt0r/agfs/agfs-server/pkg/plugin/api"
"github.com/c4pt0r/agfs/agfs-server/pkg/plugin/loader"
iradix "github.com/hashicorp/go-immutable-radix"
log "github.com/sirupsen/logrus"
)
const (
MetaValueRoot = "root"
MetaValueMountPoint = "mount-point"
)
type MountPoint struct {
Path string
Plugin plugin.ServicePlugin
Config map[string]interface{}
}
type PluginFactory func() plugin.ServicePlugin
type MountableFS struct {
mountTree atomic.Value
pluginFactories map[string]PluginFactory
pluginLoader *loader.PluginLoader
pluginNameCounters map[string]int
mu sync.RWMutex
globalHandleID atomic.Int64
handleInfos map[int64]*handleInfo
handleInfosMu sync.RWMutex
}
type handleInfo struct {
mount *MountPoint
localHandle filesystem.FileHandle
}
func NewMountableFS(poolConfig api.PoolConfig) *MountableFS {
mfs := &MountableFS{
pluginFactories: make(map[string]PluginFactory),
pluginLoader: loader.NewPluginLoader(poolConfig),
pluginNameCounters: make(map[string]int),
handleInfos: make(map[int64]*handleInfo),
}
mfs.mountTree.Store(iradix.New())
mfs.globalHandleID.Store(0)
return mfs
}
func (mfs *MountableFS) GetPluginLoader() *loader.PluginLoader {
return mfs.pluginLoader
}
type RenamedPlugin struct {
plugin.ServicePlugin
originalName string
renamedName string
}
func (rp *RenamedPlugin) Name() string {
return rp.renamedName
}
func (rp *RenamedPlugin) OriginalName() string {
return rp.originalName
}
func (mfs *MountableFS) generateUniquePluginName(baseName string) string {
if _, exists := mfs.pluginFactories[baseName]; !exists {
mfs.pluginNameCounters[baseName] = 0
return baseName
}
mfs.pluginNameCounters[baseName]++
counter := mfs.pluginNameCounters[baseName]
newName := fmt.Sprintf("%s-%d", baseName, counter)
for {
if _, exists := mfs.pluginFactories[newName]; !exists {
return newName
}
mfs.pluginNameCounters[baseName]++
counter = mfs.pluginNameCounters[baseName]
newName = fmt.Sprintf("%s-%d", baseName, counter)
}
}
func (mfs *MountableFS) RegisterPluginFactory(name string, factory PluginFactory) {
mfs.mu.Lock()
defer mfs.mu.Unlock()
mfs.pluginFactories[name] = factory
}
func (mfs *MountableFS) CreatePlugin(name string) plugin.ServicePlugin {
mfs.mu.RLock()
defer mfs.mu.RUnlock()
factory, ok := mfs.pluginFactories[name]
if !ok {
return nil
}
return factory()
}
func (mfs *MountableFS) Mount(path string, plugin plugin.ServicePlugin) error {
mfs.mu.Lock()
defer mfs.mu.Unlock()
path = filesystem.NormalizePath(path)
tree := mfs.mountTree.Load().(*iradix.Tree)
if _, exists := tree.Get([]byte(path)); exists {
return filesystem.NewAlreadyExistsError("mount", path)
}
type parentFSSetter interface {
SetParentFileSystem(filesystem.FileSystem)
}
if setter, ok := plugin.(parentFSSetter); ok {
setter.SetParentFileSystem(mfs)
log.Debugf("Set parentFS for plugin at %s", path)
}
newTree, _, _ := tree.Insert([]byte(path), &MountPoint{
Path: path,
Plugin: plugin,
Config: make(map[string]interface{}),
})
mfs.mountTree.Store(newTree)
return nil
}
func (mfs *MountableFS) MountPlugin(fstype string, path string, config map[string]interface{}) error {
mfs.mu.Lock()
defer mfs.mu.Unlock()
path = filesystem.NormalizePath(path)
tree := mfs.mountTree.Load().(*iradix.Tree)
if _, exists := tree.Get([]byte(path)); exists {
return filesystem.NewAlreadyExistsError("mount", path)
}
factory, ok := mfs.pluginFactories[fstype]
if !ok {
return fmt.Errorf("unknown filesystem type: %s", fstype)
}
pluginInstance := factory()
type rootFSSetter interface {
SetRootFS(filesystem.FileSystem)
}
if setter, ok := pluginInstance.(rootFSSetter); ok {
setter.SetRootFS(mfs)
log.Debugf("Set rootFS for plugin %s at %s", fstype, path)
}
type parentFSSetter interface {
SetParentFileSystem(filesystem.FileSystem)
}
if setter, ok := pluginInstance.(parentFSSetter); ok {
setter.SetParentFileSystem(mfs)
log.Debugf("Set parentFS for plugin %s at %s", fstype, path)
}
configWithPath := make(map[string]interface{})
for k, v := range config {
configWithPath[k] = v
}
configWithPath["mount_path"] = path
if err := pluginInstance.Validate(configWithPath); err != nil {
return fmt.Errorf("failed to validate plugin: %v", err)
}
if err := pluginInstance.Initialize(configWithPath); err != nil {
return fmt.Errorf("failed to initialize plugin: %v", err)
}
newTree, _, _ := tree.Insert([]byte(path), &MountPoint{
Path: path,
Plugin: pluginInstance,
Config: config,
})
mfs.mountTree.Store(newTree)
log.Infof("mounted %s at %s", fstype, path)
return nil
}
func (mfs *MountableFS) Unmount(path string) error {
mfs.mu.Lock()
defer mfs.mu.Unlock()
path = filesystem.NormalizePath(path)
tree := mfs.mountTree.Load().(*iradix.Tree)
val, exists := tree.Get([]byte(path))
if !exists {
return fmt.Errorf("no mount at path: %s", path)
}
mount := val.(*MountPoint)
if err := mount.Plugin.Shutdown(); err != nil {
return fmt.Errorf("failed to shutdown plugin: %v", err)
}
newTree, _, _ := tree.Delete([]byte(path))
mfs.mountTree.Store(newTree)
log.Infof("Unmounted plugin at %s", path)
return nil
}
func (mfs *MountableFS) LoadExternalPluginWithType(libraryPath string, pluginType loader.PluginType) (plugin.ServicePlugin, error) {
var p plugin.ServicePlugin
var err error
if pluginType == loader.PluginTypeWASM {
log.Infof("Loading WASM plugin with host filesystem access to all agfs paths")
p, err = mfs.pluginLoader.LoadPluginWithType(libraryPath, pluginType, mfs)
} else {
p, err = mfs.pluginLoader.LoadPluginWithType(libraryPath, pluginType)
}
if err != nil {
return nil, err
}
pluginName := p.Name()
mfs.RegisterPluginFactory(pluginName, func() plugin.ServicePlugin {
return p
})
log.Infof("Registered external plugin factory: %s (type: %s)", pluginName, pluginType)
return p, nil
}
func (mfs *MountableFS) LoadExternalPlugin(libraryPath string) (plugin.ServicePlugin, error) {
pluginType, err := loader.DetectPluginType(libraryPath)
if err != nil {
return nil, fmt.Errorf("failed to detect plugin type: %w", err)
}
if pluginType == loader.PluginTypeWASM {
return mfs.LoadExternalPluginWithType(libraryPath, pluginType)
}
p, err := mfs.pluginLoader.LoadPlugin(libraryPath)
if err != nil {
return nil, err
}
originalName := p.Name()
mfs.mu.Lock()
finalName := mfs.generateUniquePluginName(originalName)
renamed := (finalName != originalName)
if renamed {
log.Infof("Plugin name '%s' already exists, using '%s' instead", originalName, finalName)
}
var pluginToRegister plugin.ServicePlugin = p
if renamed {
pluginToRegister = &RenamedPlugin{
ServicePlugin: p,
originalName: originalName,
renamedName: finalName,
}
}
mfs.pluginFactories[finalName] = func() plugin.ServicePlugin {
return pluginToRegister
}
mfs.mu.Unlock()
log.Infof("Registered external plugin factory: %s", finalName)
if renamed {
return &RenamedPlugin{
ServicePlugin: p,
originalName: originalName,
renamedName: finalName,
}, nil
}
return p, nil
}
func (mfs *MountableFS) UnloadExternalPluginWithType(libraryPath string, pluginType loader.PluginType) error {
return mfs.pluginLoader.UnloadPluginWithType(libraryPath, pluginType)
}
func (mfs *MountableFS) UnloadExternalPlugin(libraryPath string) error {
return mfs.pluginLoader.UnloadPlugin(libraryPath)
}
func (mfs *MountableFS) GetLoadedExternalPlugins() []string {
return mfs.pluginLoader.GetLoadedPlugins()
}
func (mfs *MountableFS) GetPluginNameToPathMap() map[string]string {
return mfs.pluginLoader.GetPluginNameToPathMap()
}
func (mfs *MountableFS) GetBuiltinPluginNames() []string {
mfs.mu.RLock()
defer mfs.mu.RUnlock()
externalPlugins := mfs.pluginLoader.GetPluginNameToPathMap()
names := make([]string, 0)
for name := range mfs.pluginFactories {
if _, isExternal := externalPlugins[name]; !isExternal {
names = append(names, name)
}
}
return names
}
func (mfs *MountableFS) LoadExternalPluginsFromDirectory(dir string) ([]string, []error) {
return mfs.pluginLoader.LoadPluginsFromDirectory(dir)
}
func (mfs *MountableFS) GetMounts() []*MountPoint {
tree := mfs.mountTree.Load().(*iradix.Tree)
var mounts []*MountPoint
tree.Root().Walk(func(k []byte, v interface{}) bool {
mounts = append(mounts, v.(*MountPoint))
return false
})
return mounts
}
func (mfs *MountableFS) findMount(path string) (*MountPoint, string, bool) {
path = filesystem.NormalizePath(path)
tree := mfs.mountTree.Load().(*iradix.Tree)
k, v, found := tree.Root().LongestPrefix([]byte(path))
if !found {
return nil, "", false
}
mountPath := string(k)
if len(path) == len(mountPath) {
mount := v.(*MountPoint)
return mount, "/", true
}
if mountPath == "/" {
mount := v.(*MountPoint)
return mount, path, true
}
if len(path) > len(mountPath) && path[len(mountPath)] == '/' {
mount := v.(*MountPoint)
relPath := path[len(mountPath):]
return mount, relPath, true
}
return nil, "", false
}
func (mfs *MountableFS) Create(path string) error {
mount, relPath, found := mfs.findMount(path)
if found {
return mount.Plugin.GetFileSystem().Create(relPath)
}
return filesystem.NewPermissionDeniedError("create", path, "not allowed to create file in rootfs, use mount instead")
}
func (mfs *MountableFS) Mkdir(path string, perm uint32) error {
mount, relPath, found := mfs.findMount(path)
if found {
return mount.Plugin.GetFileSystem().Mkdir(relPath, perm)
}
return filesystem.NewPermissionDeniedError("mkdir", path, "not allowed to create directory in rootfs, use mount instead")
}
func (mfs *MountableFS) Remove(path string) error {
mount, relPath, found := mfs.findMount(path)
if found {
return mount.Plugin.GetFileSystem().Remove(relPath)
}
return filesystem.NewNotFoundError("remove", path)
}
func (mfs *MountableFS) RemoveAll(path string) error {
mount, relPath, found := mfs.findMount(path)
if found {
return mount.Plugin.GetFileSystem().RemoveAll(relPath)
}
return filesystem.NewNotFoundError("removeall", path)
}
func (mfs *MountableFS) Read(path string, offset int64, size int64) ([]byte, error) {
mount, relPath, found := mfs.findMount(path)
if found {
return mount.Plugin.GetFileSystem().Read(relPath, offset, size)
}
return nil, filesystem.NewNotFoundError("read", path)
}
func (mfs *MountableFS) Write(path string, data []byte, offset int64, flags filesystem.WriteFlag) (int64, error) {
mount, relPath, found := mfs.findMount(path)
if found {
return mount.Plugin.GetFileSystem().Write(relPath, data, offset, flags)
}
return 0, filesystem.NewNotFoundError("write", path)
}
func (mfs *MountableFS) ReadDir(path string) ([]filesystem.FileInfo, error) {
path = filesystem.NormalizePath(path)
mount, relPath, found := mfs.findMount(path)
if found {
infos, err := mount.Plugin.GetFileSystem().ReadDir(relPath)
if err != nil {
return nil, err
}
tree := mfs.mountTree.Load().(*iradix.Tree)
prefix := path
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
tree.Root().WalkPrefix([]byte(prefix), func(k []byte, v interface{}) bool {
mountPath := string(k)
rel := strings.TrimPrefix(mountPath, prefix)
if !strings.Contains(rel, "/") && rel != "" {
exists := false
for _, info := range infos {
if info.Name == rel {
exists = true
break
}
}
if !exists {
infos = append(infos, filesystem.FileInfo{
Name: rel,
Size: 0,
Mode: 0755,
ModTime: time.Now(),
IsDir: true,
Meta: filesystem.MetaData{
Type: MetaValueMountPoint,
},
})
}
}
return false
})
return infos, nil
}
tree := mfs.mountTree.Load().(*iradix.Tree)
var infos []filesystem.FileInfo
seenDirs := make(map[string]bool)
prefix := path
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
tree.Root().WalkPrefix([]byte(prefix), func(k []byte, v interface{}) bool {
mountPath := string(k)
rel := strings.TrimPrefix(mountPath, prefix)
if rel == "" {
return false
}
parts := strings.SplitN(rel, "/", 2)
nextDir := parts[0]
if !seenDirs[nextDir] {
seenDirs[nextDir] = true
infos = append(infos, filesystem.FileInfo{
Name: nextDir,
Size: 0,
Mode: 0755,
ModTime: time.Now(),
IsDir: true,
Meta: filesystem.MetaData{
Name: "rootfs",
Type: MetaValueMountPoint,
},
})
}
return false
})
if len(infos) > 0 {
return infos, nil
}
return nil, filesystem.NewNotFoundError("readdir", path)
}
func (mfs *MountableFS) Stat(path string) (*filesystem.FileInfo, error) {
path = filesystem.NormalizePath(path)
if path == "/" {
return &filesystem.FileInfo{
Name: "/",
Size: 0,
Mode: 0755,
ModTime: time.Now(),
IsDir: true,
Meta: filesystem.MetaData{
Type: MetaValueRoot,
},
}, nil
}
mount, relPath, found := mfs.findMount(path)
if found {
stat, err := mount.Plugin.GetFileSystem().Stat(relPath)
if err != nil {
return nil, err
}
if path == mount.Path && stat.Name == "/" {
name := path[1:]
if lastSlash := strings.LastIndex(name, "/"); lastSlash >= 0 {
name = name[lastSlash+1:]
}
if name == "" {
name = "/"
}
stat.Name = name
}
return stat, nil
}
tree := mfs.mountTree.Load().(*iradix.Tree)
prefix := path
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
isParent := false
tree.Root().WalkPrefix([]byte(prefix), func(k []byte, v interface{}) bool {
isParent = true
return true
})
if isParent {
name := path[1:]
if lastSlash := strings.LastIndex(name, "/"); lastSlash >= 0 {
name = name[lastSlash+1:]
}
if name == "" {
name = "/"
}
return &filesystem.FileInfo{
Name: name,
Size: 0,
Mode: 0755,
ModTime: time.Now(),
IsDir: true,
Meta: filesystem.MetaData{
Type: MetaValueMountPoint,
},
}, nil
}
return nil, filesystem.NewNotFoundError("stat", path)
}
func (mfs *MountableFS) Rename(oldPath, newPath string) error {
oldMount, oldRelPath, oldFound := mfs.findMount(oldPath)
newMount, newRelPath, newFound := mfs.findMount(newPath)
if oldFound && newFound {
if oldMount != newMount {
return fmt.Errorf("cannot rename across different mounts")
}
return oldMount.Plugin.GetFileSystem().Rename(oldRelPath, newRelPath)
}
return fmt.Errorf("cannot rename: paths not in same mounted filesystem")
}
func (mfs *MountableFS) Chmod(path string, mode uint32) error {
mount, relPath, found := mfs.findMount(path)
if found {
return mount.Plugin.GetFileSystem().Chmod(relPath, mode)
}
return filesystem.NewNotFoundError("chmod", path)
}
func (mfs *MountableFS) Touch(path string) error {
mount, relPath, found := mfs.findMount(path)
if found {
fs := mount.Plugin.GetFileSystem()
if toucher, ok := fs.(filesystem.Toucher); ok {
return toucher.Touch(relPath)
}
info, err := fs.Stat(relPath)
if err == nil {
if !info.IsDir {
data, readErr := fs.Read(relPath, 0, -1)
if readErr != nil {
return readErr
}
_, writeErr := fs.Write(relPath, data, -1, filesystem.WriteFlagNone)
return writeErr
}
return fmt.Errorf("cannot touch directory")
} else {
_, err := fs.Write(relPath, []byte{}, -1, filesystem.WriteFlagCreate)
return err
}
}
return filesystem.NewNotFoundError("touch", path)
}
func (mfs *MountableFS) Open(path string) (io.ReadCloser, error) {
mount, relPath, found := mfs.findMount(path)
if found {
return mount.Plugin.GetFileSystem().Open(relPath)
}
return nil, filesystem.NewNotFoundError("open", path)
}
func (mfs *MountableFS) OpenWrite(path string) (io.WriteCloser, error) {
mount, relPath, found := mfs.findMount(path)
if found {
return mount.Plugin.GetFileSystem().OpenWrite(relPath)
}
return nil, filesystem.NewNotFoundError("openwrite", path)
}
func (mfs *MountableFS) OpenStream(path string) (filesystem.StreamReader, error) {
mount, relPath, found := mfs.findMount(path)
if !found {
return nil, filesystem.NewNotFoundError("openstream", path)
}
fs := mount.Plugin.GetFileSystem()
if streamer, ok := fs.(filesystem.Streamer); ok {
log.Debugf("[mountablefs] OpenStream: found streamer for path %s (relPath: %s, fs type: %T)", path, relPath, fs)
return streamer.OpenStream(relPath)
}
log.Debugf("[mountablefs] OpenStream: filesystem does not support streaming: %s (fs type: %T)", path, fs)
return nil, fmt.Errorf("filesystem does not support streaming: %s", path)
}
func (mfs *MountableFS) GetStream(path string) (interface{}, error) {
mount, relPath, found := mfs.findMount(path)
if !found {
return nil, filesystem.NewNotFoundError("getstream", path)
}
type streamGetter interface {
GetStream(path string) (interface{}, error)
}
fs := mount.Plugin.GetFileSystem()
if sg, ok := fs.(streamGetter); ok {
log.Debugf("[mountablefs] GetStream: found stream getter for path %s (relPath: %s, fs type: %T)", path, relPath, fs)
return sg.GetStream(relPath)
}
log.Warnf("[mountablefs] GetStream: filesystem does not support streaming: %s (fs type: %T)", path, fs)
return nil, fmt.Errorf("filesystem does not support streaming: %s", path)
}
func (mfs *MountableFS) OpenHandle(path string, flags filesystem.OpenFlag, mode uint32) (filesystem.FileHandle, error) {
mount, relPath, found := mfs.findMount(path)
if !found {
return nil, filesystem.NewNotFoundError("openhandle", path)
}
fs := mount.Plugin.GetFileSystem()
handleFS, ok := fs.(filesystem.HandleFS)
if !ok {
return nil, filesystem.NewNotSupportedError("openhandle", path)
}
localHandle, err := handleFS.OpenHandle(relPath, flags, mode)
if err != nil {
return nil, err
}
globalID := mfs.globalHandleID.Add(1)
mfs.handleInfosMu.Lock()
mfs.handleInfos[globalID] = &handleInfo{
mount: mount,
localHandle: localHandle,
}
mfs.handleInfosMu.Unlock()
return &globalFileHandle{
globalID: globalID,
localHandle: localHandle,
mountPath: mount.Path,
fullPath: path,
}, nil
}
func (mfs *MountableFS) GetHandle(id int64) (filesystem.FileHandle, error) {
mfs.handleInfosMu.RLock()
info, found := mfs.handleInfos[id]
mfs.handleInfosMu.RUnlock()
if !found {
return nil, filesystem.ErrNotFound
}
return &globalFileHandle{
globalID: id,
localHandle: info.localHandle,
mountPath: info.mount.Path,
fullPath: info.mount.Path + info.localHandle.Path(),
}, nil
}
func (mfs *MountableFS) CloseHandle(id int64) error {
mfs.handleInfosMu.RLock()
info, found := mfs.handleInfos[id]
mfs.handleInfosMu.RUnlock()
if !found {
return filesystem.ErrNotFound
}
err := info.localHandle.Close()
if err == nil {
mfs.handleInfosMu.Lock()
delete(mfs.handleInfos, id)
mfs.handleInfosMu.Unlock()
}
return err
}
type globalFileHandle struct {
globalID int64
localHandle filesystem.FileHandle
mountPath string
fullPath string
}
func (h *globalFileHandle) ID() int64 {
return h.globalID
}
func (h *globalFileHandle) Path() string {
return h.fullPath
}
func (h *globalFileHandle) Read(buf []byte) (int, error) {
return h.localHandle.Read(buf)
}
func (h *globalFileHandle) ReadAt(buf []byte, offset int64) (int, error) {
return h.localHandle.ReadAt(buf, offset)
}
func (h *globalFileHandle) Write(data []byte) (int, error) {
return h.localHandle.Write(data)
}
func (h *globalFileHandle) WriteAt(data []byte, offset int64) (int, error) {
return h.localHandle.WriteAt(data, offset)
}
func (h *globalFileHandle) Seek(offset int64, whence int) (int64, error) {
return h.localHandle.Seek(offset, whence)
}
func (h *globalFileHandle) Sync() error {
return h.localHandle.Sync()
}
func (h *globalFileHandle) Close() error {
return h.localHandle.Close()
}
func (h *globalFileHandle) Stat() (*filesystem.FileInfo, error) {
return h.localHandle.Stat()
}
func (h *globalFileHandle) Flags() filesystem.OpenFlag {
return h.localHandle.Flags()
}
var _ filesystem.HandleFS = (*MountableFS)(nil)
var _ filesystem.FileHandle = (*globalFileHandle)(nil)