package proxyfs
import (
"fmt"
"io"
"net/url"
"strings"
"sync/atomic"
"time"
agfs "github.com/c4pt0r/agfs/agfs-sdk/go"
"github.com/c4pt0r/agfs/agfs-server/pkg/filesystem"
"github.com/c4pt0r/agfs/agfs-server/pkg/plugin"
)
const (
PluginName = "proxyfs"
)
func convertFileInfo(src agfs.FileInfo) filesystem.FileInfo {
return filesystem.FileInfo{
Name: src.Name,
Size: src.Size,
Mode: src.Mode,
ModTime: src.ModTime,
IsDir: src.IsDir,
Meta: filesystem.MetaData{
Name: src.Meta.Name,
Type: src.Meta.Type,
Content: src.Meta.Content,
},
}
}
func convertFileInfos(src []agfs.FileInfo) []filesystem.FileInfo {
result := make([]filesystem.FileInfo, len(src))
for i, f := range src {
result[i] = convertFileInfo(f)
}
return result
}
type ProxyFS struct {
client atomic.Pointer[agfs.Client]
pluginName string
baseURL string
}
func NewProxyFS(baseURL string, pluginName string) *ProxyFS {
p := &ProxyFS{
pluginName: pluginName,
baseURL: baseURL,
}
p.client.Store(agfs.NewClient(baseURL))
return p
}
func (p *ProxyFS) Reload() error {
newClient := agfs.NewClient(p.baseURL)
if err := newClient.Health(); err != nil {
return fmt.Errorf("failed to connect after reload: %w", err)
}
p.client.Store(newClient)
return nil
}
func (p *ProxyFS) Create(path string) error {
return p.client.Load().Create(path)
}
func (p *ProxyFS) Mkdir(path string, perm uint32) error {
return p.client.Load().Mkdir(path, perm)
}
func (p *ProxyFS) Remove(path string) error {
return p.client.Load().Remove(path)
}
func (p *ProxyFS) RemoveAll(path string) error {
return p.client.Load().RemoveAll(path)
}
func (p *ProxyFS) Read(path string, offset int64, size int64) ([]byte, error) {
if path == "/reload" {
data := []byte("Write to this file to reload the proxy connection\n")
return plugin.ApplyRangeRead(data, offset, size)
}
return p.client.Load().Read(path, offset, size)
}
func (p *ProxyFS) Write(path string, data []byte, offset int64, flags filesystem.WriteFlag) (int64, error) {
if path == "/reload" {
if err := p.Reload(); err != nil {
return 0, fmt.Errorf("reload failed: %w", err)
}
return int64(len(data)), nil
}
_, err := p.client.Load().Write(path, data)
if err != nil {
return 0, err
}
return int64(len(data)), nil
}
func (p *ProxyFS) ReadDir(path string) ([]filesystem.FileInfo, error) {
sdkFiles, err := p.client.Load().ReadDir(path)
if err != nil {
return nil, err
}
files := convertFileInfos(sdkFiles)
if path == "/" {
reloadFile := filesystem.FileInfo{
Name: "reload",
Size: 0,
Mode: 0o200,
ModTime: files[0].ModTime,
IsDir: false,
Meta: filesystem.MetaData{
Type: "control",
Content: map[string]string{
"description": "Write to this file to reload proxy connection",
},
},
}
files = append(files, reloadFile)
}
return files, nil
}
func (p *ProxyFS) Stat(path string) (*filesystem.FileInfo, error) {
if path == "/reload" {
return &filesystem.FileInfo{
Name: "reload",
Size: 0,
Mode: 0o200,
ModTime: time.Now(),
IsDir: false,
Meta: filesystem.MetaData{
Type: "control",
Content: map[string]string{
"description": "Write to this file to reload proxy connection",
"remote-url": p.baseURL,
},
},
}, nil
}
sdkStat, err := p.client.Load().Stat(path)
if err != nil {
return nil, err
}
stat := convertFileInfo(*sdkStat)
if stat.Meta.Content == nil {
stat.Meta.Content = make(map[string]string)
}
stat.Meta.Content["remote-url"] = p.baseURL
return &stat, nil
}
func (p *ProxyFS) Rename(oldPath, newPath string) error {
return p.client.Load().Rename(oldPath, newPath)
}
func (p *ProxyFS) Chmod(path string, mode uint32) error {
return p.client.Load().Chmod(path, mode)
}
func (p *ProxyFS) Open(path string) (io.ReadCloser, error) {
data, err := p.client.Load().Read(path, 0, -1)
if err != nil {
return nil, err
}
return io.NopCloser(io.Reader(newBytesReader(data))), nil
}
func (p *ProxyFS) OpenWrite(path string) (io.WriteCloser, error) {
return filesystem.NewBufferedWriter(path, p.Write), nil
}
func (p *ProxyFS) OpenStream(path string) (filesystem.StreamReader, error) {
streamReader, err := p.client.Load().ReadStream(path)
if err != nil {
return nil, err
}
return &ProxyStreamReader{
reader: streamReader,
path: path,
buf: make([]byte, 64*1024),
}, nil
}
func (p *ProxyFS) GetStream(path string) (interface{}, error) {
streamReader, err := p.client.Load().ReadStream(path)
if err != nil {
return nil, err
}
return &ProxyStream{
reader: streamReader,
path: path,
}, nil
}
type ProxyStreamReader struct {
reader io.ReadCloser
path string
buf []byte
}
func (psr *ProxyStreamReader) ReadChunk(timeout time.Duration) ([]byte, bool, error) {
n, err := psr.reader.Read(psr.buf)
if n > 0 {
chunk := make([]byte, n)
copy(chunk, psr.buf[:n])
return chunk, false, nil
}
if err == io.EOF {
return nil, true, io.EOF
}
if err != nil {
return nil, false, err
}
return nil, false, fmt.Errorf("read timeout")
}
func (psr *ProxyStreamReader) Close() error {
return psr.reader.Close()
}
type ProxyStream struct {
reader io.ReadCloser
path string
}
func (ps *ProxyStream) Read(p []byte) (n int, err error) {
return ps.reader.Read(p)
}
func (ps *ProxyStream) Close() error {
return ps.reader.Close()
}
type bytesReader struct {
data []byte
pos int
}
func newBytesReader(data []byte) *bytesReader {
return &bytesReader{data: data, pos: 0}
}
func (r *bytesReader) Read(p []byte) (n int, err error) {
if r.pos >= len(r.data) {
return 0, io.EOF
}
n = copy(p, r.data[r.pos:])
r.pos += n
return n, nil
}
type ProxyFSPlugin struct {
fs *ProxyFS
baseURL string
}
func NewProxyFSPlugin(baseURL string) *ProxyFSPlugin {
return &ProxyFSPlugin{
baseURL: baseURL,
fs: NewProxyFS(baseURL, PluginName),
}
}
func (p *ProxyFSPlugin) Name() string {
return PluginName
}
func (p *ProxyFSPlugin) Validate(cfg map[string]interface{}) error {
allowedKeys := []string{"base_url", "mount_path"}
if cfg != nil {
for key := range cfg {
found := false
for _, allowed := range allowedKeys {
if key == allowed {
found = true
break
}
}
if !found {
return fmt.Errorf("unknown configuration parameter: %s (allowed: %v)", key, allowedKeys)
}
}
}
baseURL := p.baseURL
if cfg != nil {
if u, ok := cfg["base_url"].(string); ok && u != "" {
baseURL = u
}
}
if baseURL == "" {
return fmt.Errorf("base_url is required in configuration")
}
if _, err := url.Parse(baseURL); err != nil {
return fmt.Errorf("invalid base_url format: %w", err)
}
return nil
}
func (p *ProxyFSPlugin) Initialize(config map[string]interface{}) error {
if config != nil {
if url, ok := config["base_url"].(string); ok && url != "" {
p.baseURL = url
p.fs = NewProxyFS(url, PluginName)
}
}
if p.baseURL == "" {
return fmt.Errorf("base_url is required in configuration")
}
if !strings.Contains(p.baseURL, "://") {
return fmt.Errorf("invalid base_url format: %s (expected format: http://hostname:port or http://hostname:port/api/v1). Did you forget to quote the URL?", p.baseURL)
}
if err := p.fs.client.Load().Health(); err != nil {
return fmt.Errorf("failed to connect to remote AGFS server at %s: %w", p.baseURL, err)
}
return nil
}
func (p *ProxyFSPlugin) GetFileSystem() filesystem.FileSystem {
return p.fs
}
func (p *ProxyFSPlugin) GetReadme() string {
return `ProxyFS Plugin - Remote AGFS Proxy
This plugin proxies all file system operations to a remote AGFS HTTP API server.
FEATURES:
- Transparent proxying of all file system operations
- Full compatibility with AGFS HTTP API
- Connects to remote AGFS servers
- Supports all standard file operations
- Supports streaming operations (cat --stream)
- Transparent proxying of remote streamfs
- Implements filesystem.Streamer interface
CONFIGURATION:
base_url: URL of the remote AGFS server (e.g., "http://remote:8080/api/v1")
HOT RELOAD:
ProxyFS provides a special /reload file for hot-reloading the connection:
Echo to /reload to refresh the proxy connection:
echo '' > /proxyfs/reload
This is useful when:
- Remote server was restarted
- Network connection was interrupted
- Need to refresh connection pool
USAGE:
All standard file operations are proxied to the remote server:
Create a file:
touch /path/to/file
Write to a file:
echo "content" > /path/to/file
Read a file:
cat /path/to/file
Create a directory:
mkdir /path/to/dir
List directory:
ls /path/to/dir
Remove file/directory:
rm /path/to/file
rm -r /path/to/dir
Move/rename:
mv /old/path /new/path
Change permissions:
chmod 755 /path/to/file
STREAMING SUPPORT:
ProxyFS transparently proxies streaming operations to remote AGFS servers.
Access remote streamfs:
p cat --stream /proxyfs/remote/streamfs/video | ffplay -
Write to remote streamfs:
cat file.mp4 | p write --stream /proxyfs/remote/streamfs/video
All streaming features from remote streamfs are fully supported:
- Real-time data streaming
- Ring buffer with historical data
- Multiple concurrent readers (fanout)
- Persistent connections (no timeout disconnect)
EXAMPLES:
# Standard file operations
agfs:/> mkdir /proxyfs/remote/data
agfs:/> echo "hello" > /proxyfs/remote/data/file.txt
agfs:/> cat /proxyfs/remote/data/file.txt
hello
agfs:/> ls /proxyfs/remote/data
# Streaming operations (outside REPL)
$ p cat --stream /proxyfs/remote/streamfs/logs
$ cat video.mp4 | p write --stream /proxyfs/remote/streamfs/video
USE CASES:
- Connect to remote AGFS instances
- Federation of multiple AGFS servers
- Access remote services through local mount points
- Distributed file system scenarios
- Stream video/audio from remote streamfs
- Remote real-time data streaming
`
}
func (p *ProxyFSPlugin) GetConfigParams() []plugin.ConfigParameter {
return []plugin.ConfigParameter{
{
Name: "base_url",
Type: "string",
Required: true,
Default: "",
Description: "Base URL of the remote AGFS server (e.g., http://localhost:8080)",
},
}
}
func (p *ProxyFSPlugin) Shutdown() error {
return nil
}
var _ plugin.ServicePlugin = (*ProxyFSPlugin)(nil)