package streamfs
import (
"bytes"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
"github.com/c4pt0r/agfs/agfs-server/pkg/filesystem"
"github.com/c4pt0r/agfs/agfs-server/pkg/plugin"
"github.com/c4pt0r/agfs/agfs-server/pkg/plugin/config"
log "github.com/sirupsen/logrus"
)
const (
PluginName = "streamfs"
)
func parseSize(s string) (int64, error) {
s = strings.TrimSpace(strings.ToUpper(s))
if val, err := strconv.ParseInt(s, 10, 64); err == nil {
return val, nil
}
units := map[string]int64{
"B": 1,
"KB": 1024,
"MB": 1024 * 1024,
"GB": 1024 * 1024 * 1024,
}
for suffix, multiplier := range units {
if strings.HasSuffix(s, suffix) {
numStr := strings.TrimSuffix(s, suffix)
numStr = strings.TrimSpace(numStr)
if val, err := strconv.ParseFloat(numStr, 64); err == nil {
return int64(val * float64(multiplier)), nil
}
}
}
return 0, fmt.Errorf("invalid size format: %s (expected format: 512KB, 1MB, etc)", s)
}
func formatSize(bytes int64) string {
const unit = 1024
if bytes < unit {
return fmt.Sprintf("%dB", bytes)
}
div, exp := int64(unit), 0
for n := bytes / unit; n >= unit; n /= unit {
div *= unit
exp++
}
units := []string{"KB", "MB", "GB", "TB"}
if exp >= len(units) {
exp = len(units) - 1
}
return fmt.Sprintf("%.1f%s", float64(bytes)/float64(div), units[exp])
}
type Reader struct {
id string
ch chan []byte
registered time.Time
droppedCount int64
readIndex int64
}
type streamReader struct {
sf *StreamFile
readerID string
ch <-chan []byte
}
func (sr *streamReader) ReadChunk(timeout time.Duration) ([]byte, bool, error) {
return sr.sf.ReadChunk(sr.readerID, sr.ch, timeout)
}
func (sr *streamReader) Close() error {
sr.sf.UnregisterReader(sr.readerID)
return nil
}
type StreamFile struct {
name string
mu sync.RWMutex
offset int64
closed bool
modTime time.Time
readers map[string]*Reader
nextReaderID int
channelBuffer int
ringBuffer [][]byte
ringSize int
writeIndex int64
totalChunks int64
}
func NewStreamFile(name string, channelBuffer int, ringSize int) *StreamFile {
if channelBuffer <= 0 {
channelBuffer = 100
}
if ringSize <= 0 {
ringSize = 100
}
sf := &StreamFile{
name: name,
modTime: time.Now(),
readers: make(map[string]*Reader),
nextReaderID: 0,
channelBuffer: channelBuffer,
ringBuffer: make([][]byte, ringSize),
ringSize: ringSize,
writeIndex: 0,
totalChunks: 0,
}
return sf
}
func (sf *StreamFile) RegisterReader() (string, <-chan []byte) {
sf.mu.Lock()
defer sf.mu.Unlock()
readerID := fmt.Sprintf("reader_%d_%d", sf.nextReaderID, time.Now().UnixNano())
sf.nextReaderID++
historyStart := sf.totalChunks - int64(sf.ringSize)
if historyStart < 0 {
historyStart = 0
}
reader := &Reader{
id: readerID,
ch: make(chan []byte, sf.channelBuffer),
registered: time.Now(),
droppedCount: 0,
readIndex: historyStart,
}
sf.readers[readerID] = reader
log.Infof("[streamfs] Registered reader %s for stream %s (total readers: %d, starting at chunk %d, current chunk: %d)",
readerID, sf.name, len(sf.readers), reader.readIndex, sf.totalChunks)
go sf.sendHistoricalData(reader)
return readerID, reader.ch
}
func (sf *StreamFile) sendHistoricalData(reader *Reader) {
sf.mu.RLock()
defer sf.mu.RUnlock()
historyStart := sf.totalChunks - int64(sf.ringSize)
if historyStart < 0 {
historyStart = 0
}
if reader.readIndex < sf.totalChunks && sf.totalChunks > 0 {
log.Debugf("[streamfs] Sending historical data to reader %s (from chunk %d to %d)",
reader.id, historyStart, sf.totalChunks)
for i := historyStart; i < sf.totalChunks; i++ {
ringIdx := int(i % int64(sf.ringSize))
if sf.ringBuffer[ringIdx] != nil {
select {
case reader.ch <- sf.ringBuffer[ringIdx]:
default:
log.Warnf("[streamfs] Reader %s channel full during historical data send", reader.id)
return
}
}
}
}
}
func (sf *StreamFile) UnregisterReader(readerID string) {
sf.mu.Lock()
defer sf.mu.Unlock()
if reader, exists := sf.readers[readerID]; exists {
close(reader.ch)
delete(sf.readers, readerID)
log.Infof("[streamfs] Unregistered reader %s for stream %s (dropped: %d chunks, total readers: %d)",
readerID, sf.name, reader.droppedCount, len(sf.readers))
}
}
func (sf *StreamFile) Write(data []byte) error {
sf.mu.Lock()
if sf.closed {
sf.mu.Unlock()
return fmt.Errorf("stream is closed")
}
chunk := make([]byte, len(data))
copy(chunk, data)
sf.offset += int64(len(data))
sf.modTime = time.Now()
ringIdx := int(sf.writeIndex % int64(sf.ringSize))
sf.ringBuffer[ringIdx] = chunk
sf.writeIndex++
sf.totalChunks++
readerSnapshot := make([]*Reader, 0, len(sf.readers))
for _, reader := range sf.readers {
readerSnapshot = append(readerSnapshot, reader)
}
sf.mu.Unlock()
successCount := 0
dropCount := 0
for _, reader := range readerSnapshot {
select {
case reader.ch <- chunk:
successCount++
default:
reader.droppedCount++
dropCount++
log.Warnf("[streamfs] Reader %s is slow, dropped chunk (total dropped: %d)", reader.id, reader.droppedCount)
}
}
if len(readerSnapshot) == 0 {
log.Debugf("[streamfs] Buffered %d bytes to ring (no readers, total chunks: %d)",
len(data), sf.totalChunks)
} else {
log.Debugf("[streamfs] Fanout %d bytes to %d readers (success: %d, dropped: %d, total chunks: %d)",
len(data), len(readerSnapshot), successCount, dropCount, sf.totalChunks)
}
return nil
}
func (sf *StreamFile) ReadChunk(readerID string, ch <-chan []byte, timeout time.Duration) ([]byte, bool, error) {
select {
case data, ok := <-ch:
if !ok {
return nil, true, io.EOF
}
return data, false, nil
case <-time.After(timeout):
sf.mu.RLock()
closed := sf.closed
sf.mu.RUnlock()
if closed {
return nil, true, io.EOF
}
return nil, false, fmt.Errorf("read timeout")
}
}
func (sf *StreamFile) Close() error {
sf.mu.Lock()
defer sf.mu.Unlock()
sf.closed = true
for id, reader := range sf.readers {
close(reader.ch)
log.Infof("[streamfs] Closed reader %s for stream %s (dropped: %d chunks)", id, sf.name, reader.droppedCount)
}
sf.readers = make(map[string]*Reader)
log.Infof("[streamfs] Stream %s closed", sf.name)
return nil
}
func (sf *StreamFile) GetInfo() filesystem.FileInfo {
sf.mu.RLock()
defer sf.mu.RUnlock()
name := sf.name
if len(name) > 0 && name[0] == '/' {
name = name[1:]
}
return filesystem.FileInfo{
Name: name,
Size: sf.offset,
Mode: 0644,
ModTime: sf.modTime,
IsDir: false,
Meta: filesystem.MetaData{
Name: PluginName,
Type: "stream",
Content: map[string]string{
"total_written": fmt.Sprintf("%d", sf.offset),
"active_readers": fmt.Sprintf("%d", len(sf.readers)),
},
},
}
}
type StreamFS struct {
streams map[string]*StreamFile
mu sync.RWMutex
channelBuffer int
ringSize int
pluginName string
}
func NewStreamFS(channelBuffer int, ringSize int) *StreamFS {
if channelBuffer <= 0 {
channelBuffer = 100
}
if ringSize <= 0 {
ringSize = 100
}
return &StreamFS{
streams: make(map[string]*StreamFile),
channelBuffer: channelBuffer,
ringSize: ringSize,
pluginName: PluginName,
}
}
func (sfs *StreamFS) Create(path string) error {
sfs.mu.Lock()
defer sfs.mu.Unlock()
if _, exists := sfs.streams[path]; exists {
return fmt.Errorf("stream already exists: %s", path)
}
sfs.streams[path] = NewStreamFile(path, sfs.channelBuffer, sfs.ringSize)
return nil
}
func (sfs *StreamFS) Mkdir(path string, perm uint32) error {
return fmt.Errorf("streamfs does not support directories")
}
func (sfs *StreamFS) Remove(path string) error {
sfs.mu.Lock()
defer sfs.mu.Unlock()
stream, exists := sfs.streams[path]
if !exists {
return fmt.Errorf("stream not found: %s", path)
}
stream.Close()
delete(sfs.streams, path)
return nil
}
func (sfs *StreamFS) RemoveAll(path string) error {
return sfs.Remove(path)
}
func (sfs *StreamFS) Read(path string, offset int64, size int64) ([]byte, error) {
if path == "/README" {
content := []byte(getReadme())
return plugin.ApplyRangeRead(content, offset, size)
}
return nil, fmt.Errorf("use stream mode for reading stream files")
}
func (sfs *StreamFS) Write(path string, data []byte, offset int64, flags filesystem.WriteFlag) (int64, error) {
sfs.mu.Lock()
stream, exists := sfs.streams[path]
if !exists {
stream = NewStreamFile(path, sfs.channelBuffer, sfs.ringSize)
sfs.streams[path] = stream
}
sfs.mu.Unlock()
err := stream.Write(data)
if err != nil {
return 0, err
}
return int64(len(data)), nil
}
func (sfs *StreamFS) ReadDir(path string) ([]filesystem.FileInfo, error) {
if path != "/" {
return nil, fmt.Errorf("not a directory: %s", path)
}
sfs.mu.RLock()
defer sfs.mu.RUnlock()
readme := filesystem.FileInfo{
Name: "README",
Size: int64(len(getReadme())),
Mode: 0444,
ModTime: time.Now(),
IsDir: false,
Meta: filesystem.MetaData{
Name: PluginName,
Type: "doc",
},
}
files := []filesystem.FileInfo{readme}
for _, stream := range sfs.streams {
files = append(files, stream.GetInfo())
}
return files, nil
}
func (sfs *StreamFS) Stat(path string) (*filesystem.FileInfo, error) {
if path == "/" {
info := &filesystem.FileInfo{
Name: "/",
Size: 0,
Mode: 0755,
ModTime: time.Now(),
IsDir: true,
Meta: filesystem.MetaData{
Name: PluginName,
},
}
return info, nil
}
if path == "/README" {
readme := getReadme()
info := &filesystem.FileInfo{
Name: "README",
Size: int64(len(readme)),
Mode: 0444,
ModTime: time.Now(),
IsDir: false,
Meta: filesystem.MetaData{
Name: PluginName,
Type: "doc",
},
}
return info, nil
}
sfs.mu.RLock()
stream, exists := sfs.streams[path]
sfs.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("stream not found: %s", path)
}
info := stream.GetInfo()
return &info, nil
}
func (sfs *StreamFS) Rename(oldPath, newPath string) error {
return fmt.Errorf("streamfs does not support rename")
}
func (sfs *StreamFS) Chmod(path string, mode uint32) error {
return fmt.Errorf("streamfs does not support chmod")
}
func (sfs *StreamFS) Open(path string) (io.ReadCloser, error) {
if path == "/README" {
return io.NopCloser(bytes.NewReader([]byte(getReadme()))), nil
}
return nil, fmt.Errorf("use stream mode for reading stream files")
}
func (sfs *StreamFS) OpenWrite(path string) (io.WriteCloser, error) {
return &streamWriter{sfs: sfs, path: path}, nil
}
func (sfs *StreamFS) OpenStream(path string) (filesystem.StreamReader, error) {
sfs.mu.Lock()
stream, exists := sfs.streams[path]
if !exists {
stream = NewStreamFile(path, sfs.channelBuffer, sfs.ringSize)
sfs.streams[path] = stream
log.Infof("[streamfs] Auto-created stream %s for reader", path)
}
sfs.mu.Unlock()
readerID, ch := stream.RegisterReader()
log.Infof("[streamfs] Opened stream %s with reader %s", path, readerID)
return &streamReader{
sf: stream,
readerID: readerID,
ch: ch,
}, nil
}
func (sfs *StreamFS) GetStream(path string) (interface{}, error) {
sfs.mu.Lock()
defer sfs.mu.Unlock()
stream, exists := sfs.streams[path]
if !exists {
stream = NewStreamFile(path, sfs.channelBuffer, sfs.ringSize)
sfs.streams[path] = stream
log.Infof("[streamfs] Auto-created stream %s for reader", path)
}
return stream, nil
}
type streamWriter struct {
sfs *StreamFS
path string
}
func (sw *streamWriter) Write(p []byte) (n int, err error) {
_, err = sw.sfs.Write(sw.path, p, -1, filesystem.WriteFlagAppend)
if err != nil {
return 0, err
}
return len(p), nil
}
func (sw *streamWriter) Close() error {
return nil
}
type StreamFSPlugin struct {
fs *StreamFS
channelBuffer int
ringSize int
}
func NewStreamFSPlugin() *StreamFSPlugin {
return &StreamFSPlugin{
channelBuffer: 100,
ringSize: 100,
}
}
func (p *StreamFSPlugin) Name() string {
return PluginName
}
func (p *StreamFSPlugin) Validate(cfg map[string]interface{}) error {
allowedKeys := []string{"channel_buffer_size", "ring_buffer_size", "mount_path"}
if err := config.ValidateOnlyKnownKeys(cfg, allowedKeys); err != nil {
return err
}
if val, exists := cfg["channel_buffer_size"]; exists {
switch v := val.(type) {
case string:
if _, err := config.ParseSize(v); err != nil {
return fmt.Errorf("invalid channel_buffer_size: %w", err)
}
case int, int64, float64:
default:
return fmt.Errorf("channel_buffer_size must be a size string (e.g., '512KB') or number")
}
}
if val, exists := cfg["ring_buffer_size"]; exists {
switch v := val.(type) {
case string:
if _, err := config.ParseSize(v); err != nil {
return fmt.Errorf("invalid ring_buffer_size: %w", err)
}
case int, int64, float64:
default:
return fmt.Errorf("ring_buffer_size must be a size string (e.g., '1MB') or number")
}
}
return nil
}
func (p *StreamFSPlugin) Initialize(config map[string]interface{}) error {
const defaultChunkSize = 64 * 1024
channelBufferBytes := int64(6 * 1024 * 1024)
if bufSizeStr, ok := config["channel_buffer_size"].(string); ok {
if parsed, err := parseSize(bufSizeStr); err == nil {
channelBufferBytes = parsed
} else {
log.Warnf("[streamfs] Invalid channel_buffer_size '%s': %v, using default", bufSizeStr, err)
}
} else if bufSize, ok := config["channel_buffer_size"].(int); ok {
channelBufferBytes = int64(bufSize)
} else if bufSizeFloat, ok := config["channel_buffer_size"].(float64); ok {
channelBufferBytes = int64(bufSizeFloat)
} else if bufSizeInt64, ok := config["channel_buffer_size"].(int64); ok {
channelBufferBytes = bufSizeInt64
}
ringBufferBytes := int64(6 * 1024 * 1024)
if ringSizeStr, ok := config["ring_buffer_size"].(string); ok {
if parsed, err := parseSize(ringSizeStr); err == nil {
ringBufferBytes = parsed
} else {
log.Warnf("[streamfs] Invalid ring_buffer_size '%s': %v, using default", ringSizeStr, err)
}
} else if ringSize, ok := config["ring_buffer_size"].(int); ok {
ringBufferBytes = int64(ringSize)
} else if ringSizeFloat, ok := config["ring_buffer_size"].(float64); ok {
ringBufferBytes = int64(ringSizeFloat)
} else if ringSizeInt64, ok := config["ring_buffer_size"].(int64); ok {
ringBufferBytes = ringSizeInt64
}
p.channelBuffer = int(channelBufferBytes / defaultChunkSize)
if p.channelBuffer < 1 {
p.channelBuffer = 1
}
p.ringSize = int(ringBufferBytes / defaultChunkSize)
if p.ringSize < 1 {
p.ringSize = 1
}
p.fs = NewStreamFS(p.channelBuffer, p.ringSize)
log.Infof("[streamfs] Initialized with channel buffer: %s (%d chunks), ring buffer: %s (%d chunks)",
formatSize(channelBufferBytes), p.channelBuffer,
formatSize(ringBufferBytes), p.ringSize)
return nil
}
func (p *StreamFSPlugin) GetFileSystem() filesystem.FileSystem {
return p.fs
}
func (p *StreamFSPlugin) GetReadme() string {
return getReadme()
}
func (p *StreamFSPlugin) GetConfigParams() []plugin.ConfigParameter {
return []plugin.ConfigParameter{
{
Name: "channel_buffer_size",
Type: "string",
Required: false,
Default: "512KB",
Description: "Channel buffer size (e.g., '512KB', '1MB')",
},
{
Name: "ring_buffer_size",
Type: "string",
Required: false,
Default: "1MB",
Description: "Ring buffer size (e.g., '1MB', '10MB')",
},
}
}
func (p *StreamFSPlugin) Shutdown() error {
return nil
}
func getReadme() string {
return `StreamFS Plugin - Streaming File System
This plugin provides streaming files that support multiple concurrent readers and writers
with real-time data fanout and ring buffer for late joiners.
FEATURES:
- Multiple writers can append data to a stream concurrently
- Multiple readers can consume from the stream independently (fanout/broadcast)
- Ring buffer (1000 chunks) stores recent data for late-joining readers
- Persistent streaming: readers wait indefinitely for new data (no timeout disconnect)
- HTTP chunked transfer with automatic flow control
- Memory-based storage with configurable channel buffer per reader
ARCHITECTURE:
- Each stream maintains a ring buffer of recent chunks (default: last 1000 chunks)
- New readers automatically receive all available historical data from ring buffer
- Writers fanout data to all active readers via buffered channels
- Readers wait indefinitely for new data (30s check interval, but never disconnect)
- Slow readers may drop chunks if their channel buffer fills up
COMMAND REFERENCE:
Write (Producer):
cat file | agfs write --stream /streamfs/stream
echo "data" | agfs write /streamfs/stream
Read (Consumer):
agfs cat --stream /streamfs/stream
agfs cat --stream /streamfs/stream > output.dat
agfs cat --stream /streamfs/stream | ffplay -
Manage:
agfs ls /streamfs
agfs stat /streamfs/stream
agfs rm /streamfs/stream
CONFIGURATION:
[plugins.streamfs]
enabled = true
path = "/streamfs"
[plugins.streamfs.config]
# Channel buffer size per reader (supports units: KB, MB, GB or raw bytes)
# Controls how much data each reader can buffer before dropping chunks
# For live streaming: 256KB - 512KB (low latency)
# For VOD/recording: 4MB - 8MB (smooth playback)
# Default: 6MB
# Examples: "512KB", "1MB", "6MB", or 524288 (bytes)
channel_buffer_size = "512KB"
# Ring buffer size for historical data (supports units: KB, MB, GB or raw bytes)
# Stores recent data for late-joining readers
# For live streaming: 512KB - 1MB (low latency, less memory)
# For VOD: 4MB - 8MB (more history for seekable playback)
# Default: 6MB
# Examples: "1MB", "4MB", or 1048576 (bytes)
ring_buffer_size = "1MB"
IMPORTANT NOTES:
- Streams are in-memory only (not persistent across restarts)
- Ring buffer stores recent data (configurable, default 6MB)
- Late-joining readers receive historical data from ring buffer
- Readers never timeout - they wait indefinitely for new data
- Writer chunk size: 64KB (configured in CLI write --stream)
- Channel buffer: configurable per reader (default 6MB)
- Slow readers may drop chunks if they can't keep up
- MUST use --stream flag for reading streams (cat --stream)
- Regular cat without --stream will fail with error
MEMORY USAGE:
File Size vs Memory Usage:
- 'ls' and 'stat' show TOTAL BYTES WRITTEN (cumulative counter)
- This is NOT the actual memory usage - just a throughput statistic
- Example: Stream shows 1GB in 'ls', but only uses 6MB RAM (ring buffer)
- The file size will continuously grow as data is written
- This is similar to /dev/null - unlimited writes, fixed memory
Actual Memory Footprint:
- Ring buffer: Fixed at ring_buffer_size (default: 6MB)
- Per reader channel: Fixed at channel_buffer_size (default: 6MB per reader)
- Total memory = ring_buffer_size + (channel_buffer_size × number of readers)
- Example with 3 readers: 6MB (ring) + 3×6MB (readers) = 24MB total
- Old data in ring buffer is automatically overwritten (circular buffer)
- No disk space is used - everything is in memory only
Overflow Protection:
- All counters use int64 to prevent overflow (max: 9.2 EB ≈ 292 years at 1GB/s)
- Ring buffer index calculations are overflow-safe on both 32-bit and 64-bit systems
- Stream can run indefinitely without counter overflow concerns
PERFORMANCE TIPS:
- For live streaming: Use smaller buffers (256KB-512KB) to reduce latency
- For VOD/recording: Use larger buffers (4MB-8MB) for smoother playback
- For video streaming: Start writer first to fill ring buffer
- Increase channel_buffer_size for high-bitrate streams
- Decrease buffer sizes for interactive/live use cases
- Monitor dropped chunks in logs (indicates slow readers)
- Example low-latency config: channel=256KB, ring=512KB
- Example high-throughput config: channel=8MB, ring=16MB
TROUBLESHOOTING:
- Error "use stream mode": Use 'cat --stream' instead of 'cat'
- Reader disconnects: Check if writer finished (readers wait indefinitely otherwise)
- High memory usage: Reduce channel_buffer_size or limit concurrent readers
ARCHITECTURE DETAILS:
- StreamFS implements filesystem.Streamer interface
- Each reader gets a filesystem.StreamReader with independent position
- Ring buffer enables time-shifting and late joining
- Fanout is non-blocking: slow readers drop chunks, fast readers proceed
- Graceful shutdown: closing stream sends EOF to all readers
`
}
var _ plugin.ServicePlugin = (*StreamFSPlugin)(nil)
var _ filesystem.FileSystem = (*StreamFS)(nil)
var _ filesystem.HandleFS = (*StreamFS)(nil)
const maxServerStreamBufferSize = 1 * 1024 * 1024
type streamFileHandle struct {
id int64
sfs *StreamFS
path string
flags filesystem.OpenFlag
stream *StreamFile
readerID string
ch <-chan []byte
readBuffer []byte
readBase int64
readOffset int64
readClosed bool
mu sync.Mutex
}
type streamHandleManager struct {
handles map[int64]*streamFileHandle
nextID int64
mu sync.Mutex
}
var sfsHandleManager = &streamHandleManager{
handles: make(map[int64]*streamFileHandle),
nextID: 1,
}
func (sfs *StreamFS) OpenHandle(path string, flags filesystem.OpenFlag, mode uint32) (filesystem.FileHandle, error) {
if path == "/README" {
sfsHandleManager.mu.Lock()
defer sfsHandleManager.mu.Unlock()
id := sfsHandleManager.nextID
sfsHandleManager.nextID++
handle := &streamFileHandle{
id: id,
sfs: sfs,
path: path,
flags: flags,
readBuffer: []byte(getReadme()),
readClosed: true,
}
sfsHandleManager.handles[id] = handle
log.Debugf("[streamfs] Opened README handle %d", id)
return handle, nil
}
sfs.mu.Lock()
stream, exists := sfs.streams[path]
if !exists {
stream = NewStreamFile(path, sfs.channelBuffer, sfs.ringSize)
sfs.streams[path] = stream
log.Infof("[streamfs] Auto-created stream %s for handle", path)
}
sfs.mu.Unlock()
sfsHandleManager.mu.Lock()
defer sfsHandleManager.mu.Unlock()
id := sfsHandleManager.nextID
sfsHandleManager.nextID++
handle := &streamFileHandle{
id: id,
sfs: sfs,
path: path,
flags: flags,
stream: stream,
}
if flags&filesystem.O_WRONLY == 0 {
readerID, ch := stream.RegisterReader()
handle.readerID = readerID
handle.ch = ch
log.Infof("[streamfs] Opened read handle %d for %s (reader: %s)", id, path, readerID)
} else {
log.Infof("[streamfs] Opened write handle %d for %s", id, path)
}
sfsHandleManager.handles[id] = handle
return handle, nil
}
func (sfs *StreamFS) GetHandle(id int64) (filesystem.FileHandle, error) {
sfsHandleManager.mu.Lock()
defer sfsHandleManager.mu.Unlock()
handle, ok := sfsHandleManager.handles[id]
if !ok {
return nil, filesystem.ErrNotFound
}
return handle, nil
}
func (sfs *StreamFS) CloseHandle(id int64) error {
sfsHandleManager.mu.Lock()
handle, ok := sfsHandleManager.handles[id]
if !ok {
sfsHandleManager.mu.Unlock()
return filesystem.ErrNotFound
}
delete(sfsHandleManager.handles, id)
sfsHandleManager.mu.Unlock()
if handle.readerID != "" && handle.stream != nil {
handle.stream.UnregisterReader(handle.readerID)
log.Infof("[streamfs] Closed handle %d, unregistered reader %s", id, handle.readerID)
}
return nil
}
func (h *streamFileHandle) ID() int64 {
return h.id
}
func (h *streamFileHandle) Path() string {
return h.path
}
func (h *streamFileHandle) Flags() filesystem.OpenFlag {
return h.flags
}
func (h *streamFileHandle) Read(buf []byte) (int, error) {
h.mu.Lock()
defer h.mu.Unlock()
return h.readLocked(buf)
}
func (h *streamFileHandle) ReadAt(buf []byte, offset int64) (int, error) {
h.mu.Lock()
defer h.mu.Unlock()
h.drainAvailableData()
if offset < int64(len(h.readBuffer)) {
end := offset + int64(len(buf))
if end > int64(len(h.readBuffer)) {
end = int64(len(h.readBuffer))
}
n := copy(buf, h.readBuffer[offset:end])
if h.readClosed && end >= int64(len(h.readBuffer)) && n < len(buf) {
return n, io.EOF
}
return n, nil
}
if h.readClosed {
return 0, io.EOF
}
if err := h.fetchMoreData(); err != nil {
if err == io.EOF {
h.readClosed = true
return 0, io.EOF
}
return 0, err
}
if offset < int64(len(h.readBuffer)) {
end := offset + int64(len(buf))
if end > int64(len(h.readBuffer)) {
end = int64(len(h.readBuffer))
}
n := copy(buf, h.readBuffer[offset:end])
return n, nil
}
return 0, nil
}
func (h *streamFileHandle) drainAvailableData() {
if h.ch == nil {
return
}
for {
select {
case data, ok := <-h.ch:
if !ok {
h.readClosed = true
return
}
h.readBuffer = append(h.readBuffer, data...)
default:
return
}
}
}
func (h *streamFileHandle) readLocked(buf []byte) (int, error) {
relOffset := h.readOffset - h.readBase
if relOffset >= 0 && relOffset < int64(len(h.readBuffer)) {
n := copy(buf, h.readBuffer[relOffset:])
h.readOffset += int64(n)
h.trimBuffer()
return n, nil
}
if h.readClosed {
return 0, io.EOF
}
if err := h.fetchMoreData(); err != nil {
if err == io.EOF {
h.readClosed = true
return 0, io.EOF
}
return 0, err
}
relOffset = h.readOffset - h.readBase
if relOffset >= 0 && relOffset < int64(len(h.readBuffer)) {
n := copy(buf, h.readBuffer[relOffset:])
h.readOffset += int64(n)
h.trimBuffer()
return n, nil
}
return 0, nil
}
func (h *streamFileHandle) trimBuffer() {
if len(h.readBuffer) <= maxServerStreamBufferSize {
return
}
consumed := h.readOffset - h.readBase
if consumed <= 0 {
return
}
margin := int64(64 * 1024)
trimPoint := consumed - margin
if trimPoint <= 0 {
return
}
if trimPoint > 0 && trimPoint < int64(len(h.readBuffer)) {
newBuffer := make([]byte, int64(len(h.readBuffer))-trimPoint)
copy(newBuffer, h.readBuffer[trimPoint:])
h.readBuffer = newBuffer
h.readBase += trimPoint
log.Debugf("[streamfs] Trimmed handle buffer: new base=%d, new size=%d", h.readBase, len(h.readBuffer))
}
}
func (h *streamFileHandle) fetchMoreData() error {
if h.ch == nil {
return io.EOF
}
select {
case data, ok := <-h.ch:
if !ok {
return io.EOF
}
h.readBuffer = append(h.readBuffer, data...)
return nil
case <-time.After(30 * time.Second):
return nil
}
}
func (h *streamFileHandle) Write(data []byte) (int, error) {
return h.WriteAt(data, 0)
}
func (h *streamFileHandle) WriteAt(data []byte, offset int64) (int, error) {
if h.stream == nil {
return 0, fmt.Errorf("stream not initialized")
}
err := h.stream.Write(data)
if err != nil {
return 0, err
}
return len(data), nil
}
func (h *streamFileHandle) Seek(offset int64, whence int) (int64, error) {
h.mu.Lock()
defer h.mu.Unlock()
var newOffset int64
switch whence {
case io.SeekStart:
newOffset = offset
case io.SeekCurrent:
newOffset = h.readOffset + offset
case io.SeekEnd:
newOffset = int64(len(h.readBuffer)) + offset
default:
return 0, fmt.Errorf("invalid whence: %d", whence)
}
if newOffset < 0 {
return 0, fmt.Errorf("negative offset")
}
h.readOffset = newOffset
return newOffset, nil
}
func (h *streamFileHandle) Sync() error {
return nil
}
func (h *streamFileHandle) Close() error {
h.mu.Lock()
defer h.mu.Unlock()
sfsHandleManager.mu.Lock()
delete(sfsHandleManager.handles, h.id)
sfsHandleManager.mu.Unlock()
if h.readerID != "" && h.stream != nil {
h.stream.UnregisterReader(h.readerID)
log.Infof("[streamfs] Handle %d closed, unregistered reader %s", h.id, h.readerID)
}
return nil
}
func (h *streamFileHandle) Stat() (*filesystem.FileInfo, error) {
return h.sfs.Stat(h.path)
}