package handlers
import (
"bufio"
"bytes"
"crypto/md5"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os/exec"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
"github.com/c4pt0r/agfs/agfs-server/pkg/filesystem"
"github.com/c4pt0r/agfs/agfs-server/pkg/mountablefs"
log "github.com/sirupsen/logrus"
"github.com/zeebo/xxh3"
)
type Handler struct {
fs filesystem.FileSystem
version string
gitCommit string
buildTime string
trafficMonitor *TrafficMonitor
}
func NewHandler(fs filesystem.FileSystem, trafficMonitor *TrafficMonitor) *Handler {
return &Handler{
fs: fs,
version: "dev",
gitCommit: "unknown",
buildTime: "unknown",
trafficMonitor: trafficMonitor,
}
}
func (h *Handler) SetVersionInfo(version, gitCommit, buildTime string) {
h.version = version
h.gitCommit = gitCommit
h.buildTime = buildTime
}
type ErrorResponse struct {
Error string `json:"error"`
}
type SuccessResponse struct {
Message string `json:"message"`
}
type FileInfoResponse struct {
Name string `json:"name"`
Size int64 `json:"size"`
Mode uint32 `json:"mode"`
ModTime string `json:"modTime"`
IsDir bool `json:"isDir"`
Meta filesystem.MetaData `json:"meta,omitempty"`
}
type ListResponse struct {
Files []FileInfoResponse `json:"files"`
}
type WriteRequest struct {
Data string `json:"data"`
}
type RenameRequest struct {
NewPath string `json:"newPath"`
}
type ChmodRequest struct {
Mode uint32 `json:"mode"`
}
type DigestRequest struct {
Algorithm string `json:"algorithm"`
Path string `json:"path"`
}
type DigestResponse struct {
Algorithm string `json:"algorithm"`
Path string `json:"path"`
Digest string `json:"digest"`
}
func writeJSON(w http.ResponseWriter, status int, data interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
json.NewEncoder(w).Encode(data)
}
func writeError(w http.ResponseWriter, status int, message string) {
writeJSON(w, status, ErrorResponse{Error: message})
}
func mapErrorToStatus(err error) int {
if errors.Is(err, filesystem.ErrNotFound) {
return http.StatusNotFound
}
if errors.Is(err, filesystem.ErrPermissionDenied) {
return http.StatusForbidden
}
if errors.Is(err, filesystem.ErrInvalidArgument) {
return http.StatusBadRequest
}
if errors.Is(err, filesystem.ErrAlreadyExists) {
return http.StatusConflict
}
if errors.Is(err, filesystem.ErrNotSupported) {
return http.StatusNotImplemented
}
return http.StatusInternalServerError
}
func (h *Handler) CreateFile(w http.ResponseWriter, r *http.Request) {
path := r.URL.Query().Get("path")
if path == "" {
writeError(w, http.StatusBadRequest, "path parameter is required")
return
}
if err := h.fs.Create(path); err != nil {
status := mapErrorToStatus(err)
writeError(w, status, err.Error())
return
}
writeJSON(w, http.StatusCreated, SuccessResponse{Message: "file created"})
}
func (h *Handler) CreateDirectory(w http.ResponseWriter, r *http.Request) {
path := r.URL.Query().Get("path")
if path == "" {
writeError(w, http.StatusBadRequest, "path parameter is required")
return
}
modeStr := r.URL.Query().Get("mode")
mode := uint32(0755)
if modeStr != "" {
m, err := strconv.ParseUint(modeStr, 8, 32)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid mode")
return
}
mode = uint32(m)
}
if err := h.fs.Mkdir(path, mode); err != nil {
status := mapErrorToStatus(err)
writeError(w, status, err.Error())
return
}
writeJSON(w, http.StatusCreated, SuccessResponse{Message: "directory created"})
}
func (h *Handler) ReadFile(w http.ResponseWriter, r *http.Request) {
path := r.URL.Query().Get("path")
if path == "" {
writeError(w, http.StatusBadRequest, "path parameter is required")
return
}
stream := r.URL.Query().Get("stream") == "true"
if stream {
h.streamFile(w, r, path)
return
}
offset := int64(0)
size := int64(-1)
if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" {
if parsedOffset, err := strconv.ParseInt(offsetStr, 10, 64); err == nil {
offset = parsedOffset
} else {
writeError(w, http.StatusBadRequest, "invalid offset parameter")
return
}
}
if sizeStr := r.URL.Query().Get("size"); sizeStr != "" {
if parsedSize, err := strconv.ParseInt(sizeStr, 10, 64); err == nil {
size = parsedSize
} else {
writeError(w, http.StatusBadRequest, "invalid size parameter")
return
}
}
data, err := h.fs.Read(path, offset, size)
if err != nil {
if err == io.EOF {
w.Header().Set("Content-Type", "application/octet-stream")
w.WriteHeader(http.StatusOK)
w.Write(data)
if h.trafficMonitor != nil && len(data) > 0 {
h.trafficMonitor.RecordRead(int64(len(data)))
}
return
}
status := mapErrorToStatus(err)
writeError(w, status, err.Error())
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.WriteHeader(http.StatusOK)
w.Write(data)
if h.trafficMonitor != nil && len(data) > 0 {
h.trafficMonitor.RecordRead(int64(len(data)))
}
}
func (h *Handler) WriteFile(w http.ResponseWriter, r *http.Request) {
path := r.URL.Query().Get("path")
if path == "" {
writeError(w, http.StatusBadRequest, "path parameter is required")
return
}
data, err := io.ReadAll(r.Body)
if err != nil {
writeError(w, http.StatusBadRequest, "failed to read request body")
return
}
if h.trafficMonitor != nil && len(data) > 0 {
h.trafficMonitor.RecordWrite(int64(len(data)))
}
bytesWritten, err := h.fs.Write(path, data, -1, filesystem.WriteFlagCreate|filesystem.WriteFlagTruncate)
if err != nil {
status := mapErrorToStatus(err)
writeError(w, status, err.Error())
return
}
writeJSON(w, http.StatusOK, SuccessResponse{Message: fmt.Sprintf("Written %d bytes", bytesWritten)})
}
func (h *Handler) Delete(w http.ResponseWriter, r *http.Request) {
path := r.URL.Query().Get("path")
if path == "" {
writeError(w, http.StatusBadRequest, "path parameter is required")
return
}
recursive := r.URL.Query().Get("recursive") == "true"
var err error
if recursive {
err = h.fs.RemoveAll(path)
} else {
err = h.fs.Remove(path)
}
if err != nil {
status := mapErrorToStatus(err)
writeError(w, status, err.Error())
return
}
writeJSON(w, http.StatusOK, SuccessResponse{Message: "deleted"})
}
func (h *Handler) ListDirectory(w http.ResponseWriter, r *http.Request) {
path := r.URL.Query().Get("path")
if path == "" {
path = "/"
}
files, err := h.fs.ReadDir(path)
if err != nil {
status := mapErrorToStatus(err)
writeError(w, status, err.Error())
return
}
var response ListResponse
for _, f := range files {
response.Files = append(response.Files, FileInfoResponse{
Name: f.Name,
Size: f.Size,
Mode: f.Mode,
ModTime: f.ModTime.Format(time.RFC3339Nano),
IsDir: f.IsDir,
Meta: f.Meta,
})
}
writeJSON(w, http.StatusOK, response)
}
func (h *Handler) Stat(w http.ResponseWriter, r *http.Request) {
path := r.URL.Query().Get("path")
if path == "" {
writeError(w, http.StatusBadRequest, "path parameter is required")
return
}
info, err := h.fs.Stat(path)
if err != nil {
status := mapErrorToStatus(err)
if status == http.StatusNotFound {
log.Debugf("Stat: path not found: %s (from %s)", path, r.RemoteAddr)
} else {
log.Errorf("Stat error for path %s: %v (from %s)", path, err, r.RemoteAddr)
}
writeError(w, status, err.Error())
return
}
response := FileInfoResponse{
Name: info.Name,
Size: info.Size,
Mode: info.Mode,
ModTime: info.ModTime.Format(time.RFC3339Nano),
IsDir: info.IsDir,
Meta: info.Meta,
}
writeJSON(w, http.StatusOK, response)
}
func (h *Handler) Rename(w http.ResponseWriter, r *http.Request) {
path := r.URL.Query().Get("path")
if path == "" {
writeError(w, http.StatusBadRequest, "path parameter is required")
return
}
var req RenameRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.NewPath == "" {
writeError(w, http.StatusBadRequest, "newPath is required")
return
}
if err := h.fs.Rename(path, req.NewPath); err != nil {
status := mapErrorToStatus(err)
writeError(w, status, err.Error())
return
}
writeJSON(w, http.StatusOK, SuccessResponse{Message: "renamed"})
}
func (h *Handler) Chmod(w http.ResponseWriter, r *http.Request) {
path := r.URL.Query().Get("path")
if path == "" {
writeError(w, http.StatusBadRequest, "path parameter is required")
return
}
var req ChmodRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if err := h.fs.Chmod(path, req.Mode); err != nil {
status := mapErrorToStatus(err)
writeError(w, status, err.Error())
return
}
writeJSON(w, http.StatusOK, SuccessResponse{Message: "permissions changed"})
}
func (h *Handler) Digest(w http.ResponseWriter, r *http.Request) {
var req DigestRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body: "+err.Error())
return
}
if req.Algorithm != "xxh3" && req.Algorithm != "md5" {
writeError(w, http.StatusBadRequest, fmt.Sprintf("unsupported algorithm: %s (supported: xxh3, md5)", req.Algorithm))
return
}
if req.Path == "" {
writeError(w, http.StatusBadRequest, "path is required")
return
}
var digest string
var err error
switch req.Algorithm {
case "xxh3":
digest, err = h.calculateXXH3Digest(req.Path)
case "md5":
digest, err = h.calculateMD5Digest(req.Path)
default:
writeError(w, http.StatusBadRequest, "unsupported algorithm: "+req.Algorithm)
return
}
if err != nil {
status := mapErrorToStatus(err)
writeError(w, status, "failed to calculate digest: "+err.Error())
return
}
response := DigestResponse{
Algorithm: req.Algorithm,
Path: req.Path,
Digest: digest,
}
writeJSON(w, http.StatusOK, response)
}
func (h *Handler) calculateXXH3Digest(path string) (string, error) {
reader, err := h.fs.Open(path)
if err != nil {
return "", err
}
defer reader.Close()
hasher := xxh3.New()
buffer := make([]byte, 64*1024)
for {
n, err := reader.Read(buffer)
if n > 0 {
hasher.Write(buffer[:n])
}
if err == io.EOF {
break
}
if err != nil {
return "", fmt.Errorf("error reading file: %w", err)
}
}
hash := hasher.Sum128().Lo
return fmt.Sprintf("%016x", hash), nil
}
func (h *Handler) calculateMD5Digest(path string) (string, error) {
reader, err := h.fs.Open(path)
if err != nil {
return "", err
}
defer reader.Close()
hasher := md5.New()
buffer := make([]byte, 64*1024)
for {
n, err := reader.Read(buffer)
if n > 0 {
hasher.Write(buffer[:n])
}
if err == io.EOF {
break
}
if err != nil {
return "", fmt.Errorf("error reading file: %w", err)
}
}
return hex.EncodeToString(hasher.Sum(nil)), nil
}
type CapabilitiesResponse struct {
Version string `json:"version"`
Features []string `json:"features"`
}
func (h *Handler) Capabilities(w http.ResponseWriter, r *http.Request) {
response := CapabilitiesResponse{
Version: h.version,
Features: []string{
"handlefs",
"grep",
"digest",
"stream",
"touch",
},
}
writeJSON(w, http.StatusOK, response)
}
type HealthResponse struct {
Status string `json:"status"`
Version string `json:"version"`
GitCommit string `json:"gitCommit"`
BuildTime string `json:"buildTime"`
}
func (h *Handler) Health(w http.ResponseWriter, r *http.Request) {
response := HealthResponse{
Status: "healthy",
Version: h.version,
GitCommit: h.gitCommit,
BuildTime: h.buildTime,
}
writeJSON(w, http.StatusOK, response)
}
func (h *Handler) Touch(w http.ResponseWriter, r *http.Request) {
path := r.URL.Query().Get("path")
if path == "" {
writeError(w, http.StatusBadRequest, "path parameter is required")
return
}
if toucher, ok := h.fs.(filesystem.Toucher); ok {
err := toucher.Touch(path)
if err != nil {
status := mapErrorToStatus(err)
writeError(w, status, err.Error())
return
}
writeJSON(w, http.StatusOK, SuccessResponse{Message: "touched"})
return
}
info, err := h.fs.Stat(path)
if err == nil {
if !info.IsDir {
data, readErr := h.fs.Read(path, 0, -1)
if readErr != nil {
status := mapErrorToStatus(readErr)
writeError(w, status, readErr.Error())
return
}
_, writeErr := h.fs.Write(path, data, -1, filesystem.WriteFlagTruncate)
if writeErr != nil {
status := mapErrorToStatus(writeErr)
writeError(w, status, writeErr.Error())
return
}
} else {
writeError(w, http.StatusBadRequest, "cannot touch directory")
return
}
} else {
_, err := h.fs.Write(path, []byte{}, -1, filesystem.WriteFlagCreate)
if err != nil {
status := mapErrorToStatus(err)
writeError(w, status, err.Error())
return
}
}
writeJSON(w, http.StatusOK, SuccessResponse{Message: "touched"})
}
func (h *Handler) SetupRoutes(mux *http.ServeMux) {
mux.HandleFunc("/api/v1/health", h.Health)
mux.HandleFunc("/api/v1/capabilities", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
h.Capabilities(w, r)
})
h.SetupHandleRoutes(mux)
mux.HandleFunc("/api/v1/files", func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodPost:
h.CreateFile(w, r)
case http.MethodGet:
h.ReadFile(w, r)
case http.MethodPut:
h.WriteFile(w, r)
case http.MethodDelete:
h.Delete(w, r)
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
}
})
mux.HandleFunc("/api/v1/directories", func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodPost:
h.CreateDirectory(w, r)
case http.MethodGet:
h.ListDirectory(w, r)
case http.MethodDelete:
h.Delete(w, r)
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
}
})
mux.HandleFunc("/api/v1/stat", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
h.Stat(w, r)
})
mux.HandleFunc("/api/v1/rename", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
h.Rename(w, r)
})
mux.HandleFunc("/api/v1/chmod", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
h.Chmod(w, r)
})
mux.HandleFunc("/api/v1/grep", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
h.Grep(w, r)
})
mux.HandleFunc("/api/v1/digest", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
h.Digest(w, r)
})
mux.HandleFunc("/api/v1/touch", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
h.Touch(w, r)
})
}
func (h *Handler) streamFile(w http.ResponseWriter, r *http.Request, path string) {
streamer, ok := h.fs.(filesystem.Streamer)
if !ok {
writeError(w, http.StatusBadRequest, "streaming not supported for this filesystem")
return
}
reader, err := streamer.OpenStream(path)
if err != nil {
writeError(w, http.StatusNotFound, err.Error())
return
}
defer reader.Close()
h.streamFromStreamReader(w, r, reader)
}
func (h *Handler) streamFromStreamReader(w http.ResponseWriter, r *http.Request, reader filesystem.StreamReader) {
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(http.StatusOK)
flusher, ok := w.(http.Flusher)
if !ok {
log.Error("ResponseWriter does not support flushing")
return
}
log.Debugf("Starting stream read")
timeout := 30 * time.Second
for {
select {
case <-r.Context().Done():
log.Infof("Client disconnected from stream")
return
default:
}
chunk, eof, err := reader.ReadChunk(timeout)
if err != nil {
if err == io.EOF {
log.Infof("Stream closed (EOF)")
return
}
if err.Error() == "read timeout" {
log.Debugf("Stream read timeout, continuing to wait...")
continue
}
log.Errorf("Error reading from stream: %v", err)
return
}
if len(chunk) > 0 {
maxChunkSize := 64 * 1024
offset := 0
for offset < len(chunk) {
select {
case <-r.Context().Done():
log.Infof("Client disconnected while writing chunk")
return
default:
}
end := offset + maxChunkSize
if end > len(chunk) {
end = len(chunk)
}
n, writeErr := w.Write(chunk[offset:end])
if writeErr != nil {
log.Debugf("Error writing chunk: %v (this is normal if client disconnected)", writeErr)
return
}
if h.trafficMonitor != nil && n > 0 {
h.trafficMonitor.RecordRead(int64(n))
}
offset += n
flusher.Flush()
}
}
if eof {
log.Debug("Stream completed (EOF)")
return
}
}
}
type GrepRequest struct {
Path string `json:"path"`
Pattern string `json:"pattern"`
Recursive bool `json:"recursive"`
CaseInsensitive bool `json:"case_insensitive"`
Stream bool `json:"stream"`
NodeLimit int `json:"node_limit"`
}
type GrepMatch struct {
File string `json:"file"`
Line int `json:"line"`
Content string `json:"content"`
}
type GrepResponse struct {
Matches []GrepMatch `json:"matches"`
Count int `json:"count"`
}
type localPathResolver interface {
ResolvePath(path string) string
}
var rgVimgrepSepRe = regexp.MustCompile(`:(\d+):(\d+):`)
func (h *Handler) Grep(w http.ResponseWriter, r *http.Request) {
var req GrepRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body: "+err.Error())
return
}
if req.Path == "" {
writeError(w, http.StatusBadRequest, "path is required")
return
}
if req.Pattern == "" {
writeError(w, http.StatusBadRequest, "pattern is required")
return
}
var re *regexp.Regexp
var err error
if req.CaseInsensitive {
re, err = regexp.Compile("(?i)" + req.Pattern)
} else {
re, err = regexp.Compile(req.Pattern)
}
if err != nil {
writeError(w, http.StatusBadRequest, "invalid regex pattern: "+err.Error())
return
}
localPath, basePath, mountPath, useRipgrep := h.resolveRipgrepPath(req.Path)
info, err := h.fs.Stat(req.Path)
if err != nil {
status := mapErrorToStatus(err)
writeError(w, status, "failed to stat path: "+err.Error())
return
}
if req.Stream {
if useRipgrep {
h.grepStreamRipgrep(w, localPath, basePath, mountPath, req.Pattern, info.IsDir, req.Recursive, req.CaseInsensitive, req.NodeLimit)
} else {
h.grepStream(w, req.Path, re, info.IsDir, req.Recursive, req.NodeLimit)
}
return
}
var matches []GrepMatch
if info.IsDir {
if req.Recursive {
if useRipgrep {
matches, err = h.grepWithRipgrep(localPath, basePath, mountPath, req.Pattern, req.CaseInsensitive, req.NodeLimit)
} else {
matches, err = h.grepDirectory(req.Path, re, req.NodeLimit)
}
} else {
writeError(w, http.StatusBadRequest, "path is a directory, use recursive=true to search")
return
}
} else {
if useRipgrep {
matches, err = h.grepWithRipgrep(localPath, basePath, mountPath, req.Pattern, req.CaseInsensitive, req.NodeLimit)
} else {
matches, err = h.grepFile(req.Path, re, req.NodeLimit)
}
}
response := GrepResponse{
Matches: matches,
Count: len(matches),
}
writeJSON(w, http.StatusOK, response)
}
func (h *Handler) resolveRipgrepPath(vfsPath string) (string, string, string, bool) {
if _, err := exec.LookPath("rg"); err != nil {
return "", "", "", false
}
if mfs, ok := h.fs.(*mountablefs.MountableFS); ok {
mount, relPath, found := findMountForPath(mfs.GetMounts(), vfsPath)
if !found {
return "", "", "", false
}
resolver, ok := mount.Plugin.GetFileSystem().(localPathResolver)
if !ok {
return "", "", "", false
}
localPath := resolver.ResolvePath(relPath)
basePath := resolver.ResolvePath("/")
return localPath, basePath, mount.Path, true
}
resolver, ok := h.fs.(localPathResolver)
if !ok {
return "", "", "", false
}
localPath := resolver.ResolvePath(vfsPath)
basePath := resolver.ResolvePath("/")
return localPath, basePath, "/", true
}
func (h *Handler) grepStreamRipgrep(w http.ResponseWriter, localPath string, basePath string, mountPath string, pattern string, isDir bool, recursive bool, caseInsensitive bool, nodeLimit int) {
w.Header().Set("Content-Type", "application/x-ndjson")
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher, ok := w.(http.Flusher)
if !ok {
log.Error("Streaming not supported")
return
}
matchCount := 0
encoder := json.NewEncoder(w)
sendMatch := func(match GrepMatch) error {
matchCount++
if err := encoder.Encode(match); err != nil {
return err
}
flusher.Flush()
return nil
}
var err error
if isDir {
if !recursive {
errMatch := map[string]interface{}{
"error": "path is a directory, use recursive=true to search",
}
encoder.Encode(errMatch)
flusher.Flush()
return
}
_, err = h.grepWithRipgrepStream(localPath, basePath, mountPath, pattern, caseInsensitive, nodeLimit, sendMatch)
} else {
_, err = h.grepWithRipgrepStream(localPath, basePath, mountPath, pattern, caseInsensitive, nodeLimit, sendMatch)
}
summary := map[string]interface{}{
"type": "summary",
"count": matchCount,
}
if err != nil {
summary["error"] = err.Error()
}
encoder.Encode(summary)
flusher.Flush()
}
func (h *Handler) grepWithRipgrep(localPath string, basePath string, mountPath string, pattern string, caseInsensitive bool, nodeLimit int) ([]GrepMatch, error) {
matches := make([]GrepMatch, 0)
_, err := h.grepWithRipgrepStream(localPath, basePath, mountPath, pattern, caseInsensitive, nodeLimit, func(match GrepMatch) error {
matches = append(matches, match)
return nil
})
if err != nil {
return nil, err
}
return matches, nil
}
func (h *Handler) grepWithRipgrepStream(localPath string, basePath string, mountPath string, pattern string, caseInsensitive bool, nodeLimit int, callback func(GrepMatch) error) (int, error) {
args := []string{"--vimgrep", "--no-heading", "--color=never"}
if caseInsensitive {
args = append(args, "-i")
}
if nodeLimit > 0 {
args = append(args, "--max-count", strconv.Itoa(nodeLimit))
}
args = append(args, "--", pattern, localPath)
cmd := exec.Command("rg", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return 0, err
}
var stderr bytes.Buffer
cmd.Stderr = &stderr
if err := cmd.Start(); err != nil {
return 0, err
}
count := 0
scanner := bufio.NewScanner(stdout)
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for scanner.Scan() {
filePath, lineNum, content, ok := parseRipgrepLine(scanner.Text())
if !ok {
continue
}
vfsPath := vfsPathFromLocal(basePath, mountPath, filePath)
match := GrepMatch{
File: vfsPath,
Line: lineNum,
Content: content,
}
if err := callback(match); err != nil {
_ = cmd.Process.Kill()
_ = cmd.Wait()
return count, err
}
count++
if nodeLimit > 0 && count >= nodeLimit {
_ = cmd.Process.Kill()
_ = cmd.Wait()
return count, nil
}
}
if err := scanner.Err(); err != nil {
_ = cmd.Process.Kill()
_ = cmd.Wait()
return count, err
}
if err := cmd.Wait(); err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
if exitErr.ExitCode() == 1 {
return count, nil
}
if stderr.Len() > 0 {
return count, errors.New(strings.TrimSpace(stderr.String()))
}
}
return count, err
}
return count, nil
}
func parseRipgrepLine(line string) (string, int, string, bool) {
matches := rgVimgrepSepRe.FindAllStringSubmatchIndex(line, -1)
if len(matches) == 0 {
return "", 0, "", false
}
m := matches[0]
if len(m) < 6 {
return "", 0, "", false
}
filePath := line[:m[0]]
lineStr := line[m[2]:m[3]]
content := line[m[1]:]
lineNum, err := strconv.Atoi(lineStr)
if err != nil {
return "", 0, "", false
}
return filePath, lineNum, content, true
}
func vfsPathFromLocal(basePath string, mountPath string, localPath string) string {
rel, err := filepath.Rel(basePath, localPath)
if err != nil {
return localPath
}
rel = filepath.ToSlash(rel)
if rel == "." {
return mountPath
}
if strings.HasPrefix(rel, "..") {
return localPath
}
mountPath = path.Clean("/" + strings.TrimPrefix(mountPath, "/"))
if mountPath == "/" {
return "/" + rel
}
return mountPath + "/" + rel
}
func findMountForPath(mounts []*mountablefs.MountPoint, targetPath string) (*mountablefs.MountPoint, string, bool) {
targetPath = filesystem.NormalizePath(targetPath)
var best *mountablefs.MountPoint
bestLen := -1
bestRel := ""
for _, m := range mounts {
mountPath := filesystem.NormalizePath(m.Path)
rel, ok := matchMountPath(mountPath, targetPath)
if !ok {
continue
}
if len(mountPath) > bestLen {
best = m
bestLen = len(mountPath)
bestRel = rel
}
}
if best == nil {
return nil, "", false
}
return best, bestRel, true
}
func matchMountPath(mountPath string, targetPath string) (string, bool) {
if mountPath == "/" {
return targetPath, true
}
if targetPath == mountPath {
return "/", true
}
if strings.HasPrefix(targetPath, mountPath) && len(targetPath) > len(mountPath) && targetPath[len(mountPath)] == '/' {
return targetPath[len(mountPath):], true
}
return "", false
}
func (h *Handler) grepStream(w http.ResponseWriter, path string, re *regexp.Regexp, isDir bool, recursive bool, nodeLimit int) {
w.Header().Set("Content-Type", "application/x-ndjson")
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher, ok := w.(http.Flusher)
if !ok {
log.Error("Streaming not supported")
return
}
matchCount := 0
encoder := json.NewEncoder(w)
sendMatch := func(match GrepMatch) error {
matchCount++
if err := encoder.Encode(match); err != nil {
return err
}
flusher.Flush()
return nil
}
var err error
if isDir {
if !recursive {
errMatch := map[string]interface{}{
"error": "path is a directory, use recursive=true to search",
}
encoder.Encode(errMatch)
flusher.Flush()
return
}
_, err = h.grepDirectoryStream(path, re, nodeLimit, sendMatch)
} else {
_, err = h.grepFileStream(path, re, nodeLimit, sendMatch)
}
summary := map[string]interface{}{
"type": "summary",
"count": matchCount,
}
if err != nil {
summary["error"] = err.Error()
}
encoder.Encode(summary)
flusher.Flush()
}
func (h *Handler) grepFileStream(path string, re *regexp.Regexp, nodeLimit int, callback func(GrepMatch) error) (int, error) {
data, err := h.fs.Read(path, 0, -1)
if err != nil && err != io.EOF {
return 0, err
}
scanner := bufio.NewScanner(bytes.NewReader(data))
lineNum := 1
count := 0
for scanner.Scan() {
if nodeLimit > 0 && count >= nodeLimit {
break
}
line := scanner.Text()
if re.MatchString(line) {
match := GrepMatch{
File: path,
Line: lineNum,
Content: line,
}
if err := callback(match); err != nil {
return count, err
}
count++
}
lineNum++
}
if err := scanner.Err(); err != nil {
return count, err
}
return count, nil
}
func (h *Handler) grepDirectoryStream(dirPath string, re *regexp.Regexp, nodeLimit int, callback func(GrepMatch) error) (int, error) {
entries, err := h.fs.ReadDir(dirPath)
if err != nil {
return 0, err
}
totalCount := 0
for _, entry := range entries {
if nodeLimit > 0 && totalCount >= nodeLimit {
break
}
fullPath := path.Join(dirPath, entry.Name)
if entry.IsDir {
count, err := h.grepDirectoryStream(fullPath, re, nodeLimit-totalCount, callback)
totalCount += count
if err != nil {
log.Warnf("failed to search directory %s: %v", fullPath, err)
continue
}
} else {
count, err := h.grepFileStream(fullPath, re, nodeLimit-totalCount, callback)
totalCount += count
if err != nil {
log.Warnf("failed to search file %s: %v", fullPath, err)
continue
}
}
}
return totalCount, nil
}
func (h *Handler) grepFile(path string, re *regexp.Regexp, nodeLimit int) ([]GrepMatch, error) {
data, err := h.fs.Read(path, 0, -1)
if err != nil && err != io.EOF {
return nil, err
}
var matches []GrepMatch
scanner := bufio.NewScanner(bytes.NewReader(data))
lineNum := 1
for scanner.Scan() {
if nodeLimit > 0 && len(matches) >= nodeLimit {
break
}
line := scanner.Text()
if re.MatchString(line) {
matches = append(matches, GrepMatch{
File: path,
Line: lineNum,
Content: line,
})
}
lineNum++
}
if err := scanner.Err(); err != nil {
return nil, err
}
return matches, nil
}
func (h *Handler) grepDirectory(dirPath string, re *regexp.Regexp, nodeLimit int) ([]GrepMatch, error) {
var allMatches []GrepMatch
entries, err := h.fs.ReadDir(dirPath)
if err != nil {
return nil, err
}
for _, entry := range entries {
if nodeLimit > 0 && len(allMatches) >= nodeLimit {
break
}
fullPath := path.Join(dirPath, entry.Name)
if entry.IsDir {
subMatches, err := h.grepDirectory(fullPath, re, nodeLimit-len(allMatches))
if err != nil {
log.Warnf("failed to search directory %s: %v", fullPath, err)
continue
}
allMatches = append(allMatches, subMatches...)
} else {
matches, err := h.grepFile(fullPath, re, nodeLimit-len(allMatches))
if err != nil {
log.Warnf("failed to search file %s: %v", fullPath, err)
continue
}
allMatches = append(allMatches, matches...)
}
}
return allMatches, nil
}
func LoggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
if r.URL.RawQuery != "" {
path += "?" + r.URL.RawQuery
}
log.Debugf("%s %s", r.Method, path)
next.ServeHTTP(w, r)
})
}