ce0e006c创建于 2025年4月24日历史提交
/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved.
 */
package filesystem

import magic.dsl.*
import magic.prelude.*
import magic.config.Config
import magic.mcp.StdioMCPServer

import std.collection.ArrayList
import std.fs.{Directory, Path, File, FSException, OpenOption}
import std.io.StringReader
@When[cjc_version < "0.56.4"]
import std.os.process.Process
//--------------------------------------------------------
@When[cjc_version >= "0.56.4"]
import std.process.Process

import log.LogLevel

var ALLOWED_DIR = ""

func checkPath(path: String): (Bool, String) {
    let p = Path(path)
    if (!p.isAbsolute()) {
        return (false, "${path} is not an absolute path")
    }
    if (!path.startsWith(ALLOWED_DIR)) {
        return (false, "${path} is not allowed")
    }
    return (true, "")
}

@tool[description: "Get the allowed directory"]
func getAllowedDir(): String {
    if (ALLOWED_DIR.isEmpty()) {
        return "No allowed directory"
    } else {
        return ALLOWED_DIR
    }
}

//---------------------------------------------------------------------

@tool[
    description: "List all files and subdirectories in a directory",
    parameters: {
        path: "Absolute path of the directory to list"
    }
]
func listDir(path: String): String {
    let (status, msg) = checkPath(path)
    if (!status) { return msg }
    let result = ArrayList<String>()
    try {
        let dir = Directory(path)
        for (info in dir.entryList()) {
            result.append(info.path.toString())
        }
    } catch (_: FSException) {
        return "${path} not existed"
    }
    return result.toString()
}

//---------------------------------------------------------------------

@tool[
    description: "Read file content",
    parameters: {
        path: "Absolute path of the file to read"
    }
]
func readFile(path: String): String {
    let (status, msg) = checkPath(path)
    if (!status) { return msg }
    try (file = File(path, OpenOption.Open(true, false))) {
        let reader = StringReader(file)
        return reader.readToEnd()
    } catch(_: FSException) {
        return "${path} not existed"
    }
    return "Unreachable"
}

@tool[
    description: "Recursively search for files and directories matching a pattern. Searches through all subdirectories from the starting path. The search is case-insensitive and matches partial names. Returns full paths to all matching items. Great for finding files when you don't know their exact location. Only searches within allowed directories.",
    parameters: {
        path: "Absolute path of the directory to search",
        pattern: "There are two kinds of patters: 1. extension pattern, like .txt, .json; 2. normal pattern, part of the path to match"
    }
]
func searchFile(path: String, pattern: String): String {
    let (status, msg) = checkPath(path)
    if (!status) { return msg }
    try {
        let result = searchFileImpl(path, pattern)
        if (result.isEmpty()) {
            return "Nothing found"
        } else {
            return String.join(result.toArray(), delimiter: "\n")
        }
    } catch(_: FSException) {
        return "${path} not existed"
    }
}

func searchFileImpl(path: String, pattern: String): ArrayList<String> {
    let result = ArrayList<String>()
    for (entry in Directory(path).entryList()) {
        if (entry.isDirectory()) {
            let subFiles = searchFileImpl(entry.path.toString(), pattern)
            for (file in subFiles) {
                result.append(file)
            }
        } else {
            if (entry.path.toString().contains(pattern)) {
                result.append(entry.path.toString())
            }
        }
    }
    return result
}

//---------------------------------------------------------------------

@agent[
    model: "siliconflow:deepseek-ai/DeepSeek-V3",
    executor: "naive"
]
class SummaryAgent {
    @prompt(
        "Summarize the input as markdown"
        "The generated content is fluent, as if it were written by a professional."
    )
}

@tool[
    description: "Summarize the file content as markdown",
    parameters: {
        path: "Absolute path of the file to read"
    }
]
func summarizeFile(path: String): String {
    let (status, msg) = checkPath(path)
    if (!status) { return msg }
    try (file = File(path, OpenOption.Open(true, false))) {
        let reader = StringReader(file)
        let content = reader.readToEnd()
        let agent = SummaryAgent()
        return agent.chat("Input: ${content}")
    } catch(_: FSException) {
        return "${path} not existed"
    }
    return "Unreachable"
}
//---------------------------------------------------------------------

main() {
    Config.logLevel = LogLevel.INFO
    Config.logFile = "./logs/mcp-server.log"

    let args = Process.current.arguments
    if (args.size != 2 || args[0] != "--allowed-dir") {
        println("Invalid arguments: `${args}`")
        return
    } else {
        ALLOWED_DIR = args[1]
    }

    StdioMCPServer.startWith([getAllowedDir, listDir, readFile, searchFile, summarizeFile])
}