import { Provider } from "@/provider/provider"
import * as Log from "@opencode-ai/core/util/log"
import crypto from "crypto"
import { Context, Effect, Layer, Record } from "effect"
import * as Stream from "effect/Stream"
import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool as aiTool, jsonSchema } from "ai"
import type { LLMEvent } from "@opencode-ai/llm"
import { LLMClient, RequestExecutor } from "@opencode-ai/llm/route"
import type { LLMClientService } from "@opencode-ai/llm/route"
import { mergeDeep } from "remeda"
import { GitLabWorkflowLanguageModel } from "gitlab-ai-provider"
import { ProviderTransform } from "@/provider/transform"
import { Config } from "@/config/config"
import { InstanceState } from "@/effect/instance-state"
import type { Agent } from "@/agent/agent"
import type { MessageV2 } from "./message-v2"
import { Plugin } from "@/plugin"
import { sessionChatIdMap } from "@/plugin/deveco"
import { SystemPrompt } from "./system"
import { Permission } from "@/permission"
import { PermissionID } from "@/permission/schema"
import { Bus } from "@/bus"
import { Wildcard } from "@/util/wildcard"
import { SessionID } from "@/session/schema"
import { Auth } from "@/auth"
import { InstallationVersion } from "@opencode-ai/core/installation/version"
import { EffectBridge } from "@/effect/bridge"
import { RuntimeFlags } from "@/effect/runtime-flags"
import * as Option from "effect/Option"
import * as OtelTracer from "@effect/opentelemetry/Tracer"
import { LLMAISDK } from "./llm/ai-sdk"
import { LLMNativeRuntime } from "./llm/native-runtime"
const log = Log.create({ service: "llm" })
export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX
const mergeOptions = (target: Record<string, any>, source: Record<string, any> | undefined): Record<string, any> =>
mergeDeep(target, source ?? {}) as Record<string, any>
export type StreamInput = {
user: MessageV2.User
sessionID: string
parentSessionID?: string
model: Provider.Model
agent: Agent.Info
permission?: Permission.Ruleset
system: string[]
messages: ModelMessage[]
small?: boolean
tools: Record<string, Tool>
retries?: number
toolChoice?: "auto" | "required" | "none"
}
export type StreamRequest = StreamInput & {
abort: AbortSignal
}
export interface Interface {
readonly stream: (input: StreamInput) => Stream.Stream<LLMEvent, unknown>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/LLM") {}
const live: Layer.Layer<
Service,
never,
| Auth.Service
| Config.Service
| Provider.Service
| Plugin.Service
| Permission.Service
| LLMClientService
| RuntimeFlags.Service
> = Layer.effect(
Service,
Effect.gen(function* () {
const auth = yield* Auth.Service
const config = yield* Config.Service
const provider = yield* Provider.Service
const plugin = yield* Plugin.Service
const perm = yield* Permission.Service
const llmClient = yield* LLMClient.Service
const flags = yield* RuntimeFlags.Service
const run = Effect.fn("LLM.run")(function* (input: StreamRequest) {
let chatId = sessionChatIdMap.get(input.sessionID)
if (!chatId) {
chatId = crypto.randomUUID().replace(/-/g, "")
sessionChatIdMap.set(input.sessionID, chatId)
}
const l = log
.clone()
.tag("providerID", input.model.providerID)
.tag("modelID", input.model.id)
.tag("session.id", input.sessionID)
.tag("chatId", chatId)
.tag("small", (input.small ?? false).toString())
.tag("agent", input.agent.name)
.tag("mode", input.agent.mode)
l.info("stream", {
modelID: input.model.id,
providerID: input.model.providerID,
chatId,
})
const requestStartTime = Date.now()
const [language, cfg, item, info] = yield* Effect.all(
[
provider.getLanguage(input.model),
config.get(),
provider.getProvider(input.model.providerID),
auth.get(input.model.providerID),
],
{ concurrency: "unbounded" },
)
const isOpenaiOauth = item.id === "openai" && info?.type === "oauth"
const system: string[] = []
system.push(
[
...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)),
...input.system,
...(input.user.system ? [input.user.system] : []),
]
.filter((x) => x)
.join("\n"),
)
const header = system[0]
yield* plugin.trigger(
"experimental.chat.system.transform",
{ sessionID: input.sessionID, model: input.model },
{ system },
)
if (system.length > 2 && system[0] === header) {
const rest = system.slice(1)
system.length = 0
system.push(header, rest.join("\n"))
}
const variant =
!input.small && input.model.variants && input.user.model.variant
? input.model.variants[input.user.model.variant]
: {}
const base = input.small
? ProviderTransform.smallOptions(input.model)
: ProviderTransform.options({
model: input.model,
sessionID: input.sessionID,
providerOptions: item.options,
})
const options = mergeOptions(mergeOptions(mergeOptions(base, input.model.options), input.agent.options), variant)
if (isOpenaiOauth) {
options.instructions = system.join("\n")
}
const isWorkflow = language instanceof GitLabWorkflowLanguageModel
const messages = isOpenaiOauth
? input.messages
: isWorkflow
? input.messages
: [
...system.map(
(x): ModelMessage => ({
role: "system",
content: x,
}),
),
...input.messages,
]
const params = yield* plugin.trigger(
"chat.params",
{
sessionID: input.sessionID,
agent: input.agent.name,
model: input.model,
provider: item,
message: input.user,
},
{
temperature: input.model.capabilities.temperature
? (input.agent.temperature ?? ProviderTransform.temperature(input.model))
: undefined,
topP: input.agent.topP ?? ProviderTransform.topP(input.model),
topK: ProviderTransform.topK(input.model),
maxOutputTokens: ProviderTransform.maxOutputTokens(input.model, flags.outputTokenMax),
options,
},
)
const { headers } = yield* plugin.trigger(
"chat.headers",
{
sessionID: input.sessionID,
agent: input.agent.name,
model: input.model,
provider: item,
message: input.user,
},
{
headers: {},
},
)
const tools = resolveTools(input)
if (
input.model.providerID.includes("github-copilot") &&
Object.keys(tools).length === 0 &&
hasToolCalls(input.messages)
) {
tools["_noop"] = aiTool({
description: "Do not call this tool. It exists only for API compatibility and must never be invoked.",
inputSchema: jsonSchema({
type: "object",
properties: {
reason: { type: "string", description: "Unused" },
},
}),
execute: async () => ({ output: "", title: "", metadata: {} }),
})
}
const sortedTools = Object.fromEntries(Object.entries(tools).toSorted(([a], [b]) => a.localeCompare(b)))
if (language instanceof GitLabWorkflowLanguageModel) {
const workflowModel = language as GitLabWorkflowLanguageModel & {
sessionID?: string
sessionPreapprovedTools?: string[]
approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }>
}
workflowModel.sessionID = input.sessionID
workflowModel.systemPrompt = system.join("\n")
workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
const t = sortedTools[toolName]
if (!t || !t.execute) {
return { result: "", error: `Unknown tool: ${toolName}` }
}
try {
const result = await t.execute!(JSON.parse(argsJson), {
toolCallId: _requestID,
messages: input.messages,
abortSignal: input.abort,
})
const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result))
return {
result: output,
metadata: typeof result === "object" ? result?.metadata : undefined,
title: typeof result === "object" ? result?.title : undefined,
}
} catch (e: any) {
return { result: "", error: e.message ?? String(e) }
}
}
const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
workflowModel.sessionPreapprovedTools = Object.keys(sortedTools).filter((name) => {
const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
return !match || match.action !== "ask"
})
const bridge = yield* EffectBridge.make()
const approvedToolsForSession = new Set<string>()
workflowModel.approvalHandler = bridge.bind(async (approvalTools) => {
const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[]
if (uniqueNames.every((name) => approvedToolsForSession.has(name))) {
return { approved: true }
}
const id = PermissionID.ascending()
let unsub: (() => void) | undefined
try {
unsub = Bus.subscribe(Permission.Event.Replied, (evt) => {
if (evt.properties.requestID === id) void evt.properties.reply
})
const toolPatterns = approvalTools.map((t: { name: string; args: string }) => {
try {
const parsed = JSON.parse(t.args) as Record<string, unknown>
const title = (parsed?.title ?? parsed?.name ?? "") as string
return title ? `${t.name}: ${title}` : t.name
} catch {
return t.name
}
})
const uniquePatterns = [...new Set(toolPatterns)] as string[]
await bridge.promise(
perm.ask({
id,
sessionID: SessionID.make(input.sessionID),
permission: "workflow_tool_approval",
patterns: uniquePatterns,
metadata: { tools: approvalTools },
always: uniquePatterns,
ruleset: [],
}),
)
for (const name of uniqueNames) approvedToolsForSession.add(name)
workflowModel.sessionPreapprovedTools = [...(workflowModel.sessionPreapprovedTools ?? []), ...uniqueNames]
return { approved: true }
} catch {
return { approved: false }
} finally {
unsub?.()
}
})
}
const tracer = cfg.experimental?.openTelemetry
? Option.getOrUndefined(yield* Effect.serviceOption(OtelTracer.OtelTracer))
: undefined
const telemetryTracer = tracer
? new Proxy(tracer, {
get(target, prop, receiver) {
if (prop !== "startSpan") return Reflect.get(target, prop, receiver)
return (...args: Parameters<typeof target.startSpan>) => {
const span = target.startSpan(...args)
span.setAttribute("session.id", input.sessionID)
return span
}
},
})
: undefined
const opencodeProjectID = input.model.providerID.startsWith("opencode") || input.model.providerID.startsWith("deveco")
? (yield* InstanceState.context).project.id
: undefined
const requestHeaders = {
...(input.model.providerID.startsWith("opencode") || input.model.providerID.startsWith("deveco")
? {
...(opencodeProjectID ? { "x-deveco-project": opencodeProjectID } : {}),
"x-deveco-session": input.sessionID,
"x-deveco-request": input.user.id,
"x-deveco-client": flags.client,
"User-Agent": `deveco/${InstallationVersion}`,
}
: {
"x-session-affinity": input.sessionID,
...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}),
"User-Agent": `deveco/${InstallationVersion}`,
}),
...input.model.headers,
...headers,
}
if (flags.experimentalNativeLlm) {
const native = LLMNativeRuntime.stream({
model: input.model,
provider: item,
auth: info,
llmClient,
isOpenaiOauth,
system,
messages,
tools: sortedTools,
toolChoice: input.toolChoice,
temperature: params.temperature,
topP: params.topP,
topK: params.topK,
maxOutputTokens: params.maxOutputTokens,
providerOptions: params.options,
headers: requestHeaders,
abort: input.abort,
})
if (native.type === "supported") {
yield* Effect.logInfo("llm runtime selected").pipe(
Effect.annotateLogs({
"llm.runtime": "native",
"llm.provider": input.model.providerID,
"llm.model": input.model.id,
}),
)
return {
type: "native" as const,
stream: native.stream,
}
}
yield* Effect.logInfo("llm runtime selected").pipe(
Effect.annotateLogs({
"llm.runtime": "ai-sdk",
"llm.provider": input.model.providerID,
"llm.model": input.model.id,
"llm.native_unsupported_reason": native.reason,
}),
)
l.info("native runtime unavailable; falling back to ai-sdk", { reason: native.reason })
}
l.info("model request starting", {
temperature: params.temperature,
maxOutputTokens: params.maxOutputTokens,
messageCount: messages.length,
toolCount: Object.keys(tools).length,
toolChoice: input.toolChoice,
maxRetries: input.retries ?? 0,
})
yield* Effect.logInfo("llm runtime selected").pipe(
Effect.annotateLogs({
"llm.runtime": "ai-sdk",
"llm.provider": input.model.providerID,
"llm.model": input.model.id,
}),
)
return {
type: "ai-sdk" as const,
result: streamText({
onError(error) {
const duration = Date.now() - requestStartTime
l.error("stream error", {
error: error instanceof Error ? error.message : String(error),
errorType: error instanceof Error ? error.constructor.name : typeof error,
duration,
})
},
async experimental_repairToolCall(failed) {
const lower = failed.toolCall.toolName.toLowerCase()
if (lower !== failed.toolCall.toolName && sortedTools[lower]) {
l.info("repairing tool call", {
tool: failed.toolCall.toolName,
repaired: lower,
})
return {
...failed.toolCall,
toolName: lower,
}
}
return {
...failed.toolCall,
input: JSON.stringify({
tool: failed.toolCall.toolName,
error: failed.error.message,
}),
toolName: "invalid",
}
},
temperature: params.temperature,
topP: params.topP,
topK: params.topK,
providerOptions: ProviderTransform.providerOptions(input.model, params.options),
activeTools: Object.keys(sortedTools).filter((x) => x !== "invalid"),
tools: sortedTools,
toolChoice: input.toolChoice,
maxOutputTokens: params.maxOutputTokens,
abortSignal: input.abort,
headers: requestHeaders,
maxRetries: input.retries ?? 0,
messages,
model: wrapLanguageModel({
model: language,
middleware: [
{
specificationVersion: "v3" as const,
async transformParams(args) {
if (args.type === "stream") {
args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options)
}
return args.params
},
},
],
}),
experimental_telemetry: {
isEnabled: cfg.experimental?.openTelemetry,
functionId: "session.llm",
tracer: telemetryTracer,
metadata: {
userId: cfg.username ?? "unknown",
sessionId: input.sessionID,
},
},
}),
}
})
const stream: Interface["stream"] = (input) =>
Stream.scoped(
Stream.unwrap(
Effect.gen(function* () {
const ctrl = yield* Effect.acquireRelease(
Effect.sync(() => new AbortController()),
(ctrl) => Effect.sync(() => ctrl.abort()),
)
const result = yield* run({ ...input, abort: ctrl.signal })
if (result.type === "native") return result.stream
const state = LLMAISDK.adapterState()
return Stream.fromAsyncIterable(result.result.fullStream, (e) =>
e instanceof Error ? e : new Error(String(e)),
).pipe(
Stream.mapEffect((event) => LLMAISDK.toLLMEvents(state, event)),
Stream.flatMap((events) => Stream.fromIterable(events)),
)
}),
),
)
return Service.of({ stream })
}),
)
export const layer = live.pipe(Layer.provide(Permission.defaultLayer))
export const defaultLayer = Layer.suspend(() =>
layer.pipe(
Layer.provide(Auth.defaultLayer),
Layer.provide(Config.defaultLayer),
Layer.provide(Provider.defaultLayer),
Layer.provide(Plugin.defaultLayer),
Layer.provide(LLMClient.layer.pipe(Layer.provide(RequestExecutor.defaultLayer))),
Layer.provide(RuntimeFlags.defaultLayer),
),
)
function resolveTools(input: Pick<StreamInput, "tools" | "agent" | "permission" | "user">) {
const disabled = Permission.disabled(
Object.keys(input.tools),
Permission.merge(input.agent.permission, input.permission ?? []),
)
return Record.filter(input.tools, (_, k) => input.user.tools?.[k] !== false && !disabled.has(k))
}
export function hasToolCalls(messages: ModelMessage[]): boolean {
for (const msg of messages) {
if (!Array.isArray(msg.content)) continue
for (const part of msg.content) {
if (part.type === "tool-call" || part.type === "tool-result") return true
}
}
return false
}
export * as LLM from "./llm"