import json
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, NamedTuple, Optional, Union
from typing_extensions import override
SLOTS = list[Union[str, set[str], dict[str, str]]]
class FunctionCall(NamedTuple):
name: str
arguments: str
DEFAULT_TOOL_PROMPT = (
"You have access to the following tools:\n{tool_text}"
"Use the following format if using a tool:\n"
"```\n"
"Action: tool name (one of [{tool_names}])\n"
"Action Input: the input to the tool, in a JSON format representing the kwargs "
"""(e.g. ```{{"input": "hello world", "num_beams": 5}}```)\n"""
"```\n"
)
QWEN_TOOL_PROMPT = (
"\n\n# Tools\n\nYou may call one or more functions to assist with the user query.\n\n"
"You are provided with function signatures within <tools></tools> XML tags:\n<tools>{tool_text}"
"\n</tools>\n\nFor each function call, return a json object with function name and arguments within "
"""<tool_call></tool_call> XML tags:\n<tool_call>\n{{"name": <function-name>, """
""""arguments": <args-json-object>}}\n</tool_call>"""
)
@dataclass
class ToolUtils(ABC):
"""Base class for tool utilities."""
@staticmethod
@abstractmethod
def tool_formatter(tools: list[dict[str, Any]]) -> str:
r"""Generate the system message describing all the available tools."""
...
@staticmethod
@abstractmethod
def function_formatter(functions: list["FunctionCall"]) -> str:
r"""Generate the assistant message including all the tool calls."""
...
@staticmethod
@abstractmethod
def tool_extractor(content: str) -> Union[str, list["FunctionCall"]]:
r"""Extract all the function calls from the assistant message.
It should be an inverse function of `function_formatter`.
"""
...
class DefaultToolUtils(ToolUtils):
r"""Default tool using template."""
@staticmethod
def tool_formatter(tools: list[dict[str, Any]]) -> str:
tool_text = ""
tool_names = []
for tool in tools:
tool = tool.get("function", "") if tool.get("type") == "function" else tool
param_text = ""
for name, param in tool["parameters"]["properties"].items():
required, enum, items = "", "", ""
if name in tool["parameters"].get("required", []):
required = ", required"
if param.get("enum", None):
enum = ", should be one of [{}]".format(", ".join(param["enum"]))
if param.get("items", None):
items = ", where each item should be {}".format(param["items"].get("type", ""))
param_text += " - {name} ({type}{required}): {desc}{enum}{items}\n".format(
name=name,
type=param.get("type", ""),
required=required,
desc=param.get("description", ""),
enum=enum,
items=items,
)
tool_text += "> Tool Name: {name}\nTool Description: {desc}\nTool Args:\n{args}\n".format(
name=tool["name"], desc=tool.get("description", ""), args=param_text
)
tool_names.append(tool["name"])
return DEFAULT_TOOL_PROMPT.format(tool_text=tool_text, tool_names=", ".join(tool_names))
@staticmethod
def function_formatter(functions: list["FunctionCall"]) -> str:
return "\n".join([f"Action: {name}\nAction Input: {arguments}" for name, arguments in functions])
@staticmethod
def tool_extractor(content: str) -> Union[str, list["FunctionCall"]]:
regex = re.compile(r"Action:\s*([a-zA-Z0-9_]+)\s*Action Input:\s*(.+?)(?=\s*Action:|\s*$)", re.DOTALL)
action_match: list[tuple[str, str]] = re.findall(regex, content)
if not action_match:
return content
results = []
for match in action_match:
tool_name = match[0].strip()
tool_input = match[1].strip().strip('"').strip("```")
try:
arguments = json.loads(tool_input)
results.append(FunctionCall(tool_name, json.dumps(arguments, ensure_ascii=False)))
except json.JSONDecodeError:
return content
return results
class QwenToolUtils(ToolUtils):
r"""Qwen 2.5 tool using template."""
@staticmethod
def tool_formatter(tools: list[dict[str, Any]]) -> str:
tool_text = ""
for tool in tools:
wrapped_tool = tool if tool.get("type") == "function" else {"type": "function", "function": tool}
tool_text += "\n" + json.dumps(wrapped_tool, ensure_ascii=False)
return QWEN_TOOL_PROMPT.format(tool_text=tool_text)
@staticmethod
def function_formatter(functions: list["FunctionCall"]) -> str:
function_texts = [
json.dumps({"name": name, "arguments": json.loads(arguments)}, ensure_ascii=False)
for name, arguments in functions
]
return "\n".join([f"<tool_call>\n{text}\n</tool_call>" for text in function_texts])
@staticmethod
def tool_extractor(content: str) -> Union[str, list["FunctionCall"]]:
regex = re.compile(r"<tool_call>(.+?)</tool_call>(?=\s*<tool_call>|\s*$)", re.DOTALL)
tool_match: list[str] = re.findall(regex, content)
if not tool_match:
return content
results = []
for tool in tool_match:
try:
tool = json.loads(tool.strip())
except json.JSONDecodeError:
return content
if "name" not in tool or "arguments" not in tool:
return content
results.append(FunctionCall(tool["name"], json.dumps(tool["arguments"], ensure_ascii=False)))
return results
TOOLS = {
"default": DefaultToolUtils(),
"qwen": QwenToolUtils(),
}
def get_tool_utils(name: str) -> "ToolUtils":
tool_utils = TOOLS.get(name, None)
if tool_utils is None:
raise ValueError(f"Tool utils `{name}` not found.")
return tool_utils
@dataclass
class Formatter(ABC):
slots: SLOTS = field(default_factory=list)
tool_format: Optional[str] = None
@abstractmethod
def apply(self, **kwargs) -> SLOTS:
r"""Forms a list of slots according to the inputs to encode."""
...
def extract(self, content: str) -> Union[str, list["FunctionCall"]]:
r"""Extract a list of tuples from the response message if using tools.
Each tuple consists of function name and function arguments.
"""
raise NotImplementedError
@dataclass
class EmptyFormatter(Formatter):
def __post_init__(self):
has_placeholder = False
for slot in filter(lambda s: isinstance(s, str), self.slots):
if re.search(r"\{\{[a-zA-Z_][a-zA-Z0-9_]*\}\}", slot):
has_placeholder = True
if has_placeholder:
raise ValueError("Empty formatter should not contain any placeholder.")
def apply(self, **kwargs) -> SLOTS:
return self.slots
@dataclass
class StringFormatter(Formatter):
def __post_init__(self):
has_placeholder = False
for slot in filter(lambda s: isinstance(s, str), self.slots):
if re.search(r"\{\{[a-zA-Z_][a-zA-Z0-9_]*\}\}", slot):
has_placeholder = True
if not has_placeholder:
raise ValueError("A placeholder is required in the string formatter.")
def apply(self, **kwargs) -> SLOTS:
elements = []
for slot in self.slots:
if isinstance(slot, str):
for name, value in kwargs.items():
if not isinstance(value, str):
raise RuntimeError(f"Expected a string, got {value}")
slot = slot.replace("{{" + name + "}}", value, 1)
elements.append(slot)
elif isinstance(slot, (dict, set)):
elements.append(slot)
else:
raise RuntimeError(f"Input must be string, set[str] or dict[str, str], got {type(slot)}.")
return elements
@dataclass
class FunctionFormatter(StringFormatter):
def __post_init__(self):
super().__post_init__()
self.tool_utils = get_tool_utils(self.tool_format)
def apply(self, **kwargs) -> SLOTS:
content: str = kwargs.pop("content")
thought_words, thought = kwargs.pop("thought_words", None), None
if thought_words and len(thought_words) == 2:
regex = re.compile(rf"{re.escape(thought_words[0])}(.*?){re.escape(thought_words[1])}", re.DOTALL)
thought = re.search(regex, content)
if thought:
content = content.replace(thought.group(0), "")
functions: list[FunctionCall] = []
try:
tool_calls = json.loads(content)
if not isinstance(tool_calls, list):
tool_calls = [tool_calls]
for tool_call in tool_calls:
functions.append(
FunctionCall(tool_call["name"], json.dumps(tool_call["arguments"], ensure_ascii=False))
)
except json.JSONDecodeError:
raise RuntimeError(f"Invalid JSON format in function message: {str([content])}.")
function_str = self.tool_utils.function_formatter(functions)
if thought:
function_str = thought.group(0) + function_str
return super().apply(content=function_str)
@dataclass
class ToolFormatter(Formatter):
def __post_init__(self):
self.tool_utils = get_tool_utils(self.tool_format)
def apply(self, **kwargs) -> SLOTS:
content = kwargs.pop("content")
try:
tools = json.loads(content)
return [self.tool_utils.tool_formatter(tools) if len(tools) != 0 else ""]
except json.JSONDecodeError:
raise RuntimeError(f"Invalid JSON format in tool description: {str([content])}.")
def extract(self, content: str) -> Union[str, list["FunctionCall"]]:
return self.tool_utils.tool_extractor(content)