Bring your own LLM
Core ADK ships with no default model client. There is no hidden model provider, no automatic API key resolution, and no fallback model. If you want a ready-to-use LLM runtime, opt in to one of the first-party batteries: OpenAIChatCompletionsAdapter or WebLLMChatCompletionsAdapter.
If you do not want to write a custom executor, you do not have to. You still must provide your TurnRunner callbacks; LLM batteries only satisfy the executor seam. See Batteries. Wiring an off-the-shelf battery is a single line of configuration:
import { TurnRunner } from '@nhtio/adk'
import { OpenAIChatCompletionsAdapter } from '@nhtio/adk/batteries/llm'
const runner = new TurnRunner({
...callbacks,
executorCallback: new OpenAIChatCompletionsAdapter({
model: 'gpt-4o',
apiKey: process.env.OPENAI_API_KEY,
autoAck: true,
}).executor(),
})autoAck: true is required here because the executor does not call ctx.ack() by default — the implementor owns turn completion, and autoAck: true restores single-shot behavior.
A custom executorCallback is the single required seam between the ADK runtime and your choice of intelligence.
If your agent loops, hallucinates, or drops connection, your executor is the first place to look. ADK provides the rails. You provide the model call, stream parser, tool loop, retry policy, and terminal signal.
The Executor Slot
ADK has zero interest in how decisions are made. It does not parse system instructions, format conversation history, or speak to APIs. The executor is a slot—a function that bridges ADK's DispatchContext to your LLM client, rules engine, or custom state machine.
The "model" doesn't have to be an LLM
The DispatchExecutorFn can wrap a model, a hardcoded decision tree, or a remote agent. The rest of the ADK runtime—turns, tools, state, event streaming—operates identically. See How agents work for details.
These rules are the boundary:
- No primary reasoning loops in pipelines. Pipelines must not be the primary reasoning loop. Secondary preprocessing (e.g. query rewriting, classification) is a deliberate exception where you pay double latency and cost. Accept that trade-off explicitly; not as muscle memory.
- Never call a model in an event listener. Event listeners are telemetry sinks. Triggering model calls inside them creates unmonitored execution paths and breaks the lifecycle.
- Never call a model inside a tool—unless that tool is explicitly a sub-agent wrapping its own scoped
TurnRunner.
The DispatchExecutorFn Contract
A custom executor must implement the DispatchExecutorFn signature:
import type { DispatchExecutorFn } from '@nhtio/adk'
const myExecutor: DispatchExecutorFn = async (ctx, helpers) => {
// Your code here
}The interface definitions:
type DispatchExecutorFn = (
ctx: DispatchContext,
helpers: DispatchExecutorHelpers
) => void | Promise<void>ctx: DispatchContext
The dispatch context provides the current turn state and controls the execution lifecycle. Key properties:
| Member | Type / Description |
|---|---|
ctx.turnMessages | Set<Message> — The conversation history for this turn. |
ctx.tools | ToolRegistry — The tools available. Use ctx.tools.all() to list them. |
ctx.iteration | number — Zero-based count of how many times the executor has run this turn. |
ctx.toolCallCount(checksum) | (checksum: string) => number — Execution frequency of a tool + args combo. Pass the same executor-defined checksum you persist for that tool call. |
ctx.isSignalled | boolean — true if ack() or nack() has been called. |
ctx.abortSignal | AbortSignal — The signal on the active DispatchContext, shared with the turn abort controller. |
ctx.storeMessage(m) | (m: Message) => Promise<void> — Persist a message. |
ctx.storeToolCall(tc) | (tc: ToolCall) => Promise<void> — Persist a tool call and its results. |
ctx.ack() | () => void — Signal successful iteration completion. |
ctx.nack(error) | (error?: Error) => void — Signal iteration failure. |
helpers: DispatchExecutorHelpers
The I/O interface for real-time telemetry and streaming:
| Method | Description |
|---|---|
helpers.reportMessage(id, delta, opts?) | Stream text chunks to the functional event bus. |
helpers.reportThought(id, delta, opts?) | Stream thinking/reasoning chunks (separate from message stream). |
helpers.reportToolCall(id, partial) | Emit tool status (arguments, execution state, final result). |
helpers.log | Structured logger (trace, debug, info, warn, error) bound to this turn. |
The Ack/Nack Invariant
Crucial Invariant
Call exactly one of ctx.ack() or ctx.nack(error) exactly once per executor invocation.
- Failing to signal (without throwing): The dispatch loops indefinitely unless middleware signals or aborts. ADK core implements no built-in timeouts or iteration caps.
- Calling both or calling either twice: Throws
E_LLM_EXECUTION_ALREADY_SIGNALLED. - Unhandled exceptions: If your executor code throws an unhandled error, dispatch will reject/end as a nack-status error.
A try/catch block is highly recommended for clean, provider-specific cleanup and error formatting, but it is not technically required to prevent hangs caused by thrown errors.
import type { DispatchExecutorFn } from '@nhtio/adk'
const safeExecutor: DispatchExecutorFn = async (ctx, helpers) => {
try {
// Call your model here
ctx.ack()
} catch (error) {
const err = error instanceof Error ? error : new Error(String(error))
ctx.nack(err)
}
}Calling ctx.ack() marks the current iteration as successful. Calling ctx.nack(error) propagates the error to the observability bus, fires turnEnd, and terminates the turn.
The Five Jobs of an Executor
A robust custom executor executes these five tasks in sequence:
1. Format the Prompt Context
Map ADK primitives to your provider's expected schema:
- Convert
ctx.turnMessagesinto chat roles and content. - Map
ctx.tools.all()to the model's tool/function calling definitions. - Inspect
ctx.iterationto inject corrective prompting if the agent is stuck.
2. Call the Model Client
Invoke your provider. While streaming is a product-level choice and not mandated by ADK, most interactive applications should stream chunks to minimize perceived latency.
3. Stream Telemetry via Helpers
Relay incoming chunks immediately to notify consumers in real time:
- Use
helpers.reportMessage(messageId, chunk)for text content. - Use
helpers.reportThought(messageId, chunk)for reasoning/thinking blocks. - Use
helpers.reportToolCall(callId, { tool, args })for tool call streaming.
4. Execute Tools Inline
If the model requests tool execution, resolve them inside the executor:
- Fetch the tool instance:
const tool = ctx.tools.get(toolName). - Execute the tool, passing the context:
const results = await tool.executor(ctx)(args). - Report completion:
helpers.reportToolCall(callId, { results, isComplete: true }). - Persist the execution details:
await ctx.storeToolCall(toolCall). - Continue the loop. Append the tool results to your conversation history and call the model again.
5. Finalize the Iteration
If the model yields a final text response (no further tool calls):
- Persist the final message:
await ctx.storeMessage(finalMessage). - Close the loop:
ctx.ack().
Ack Should Be Last
Treat ctx.ack() as the final lifecycle signal by convention. It does not disable ctx.store*, and awaited writes before the executor returns are still flushed, but keeping persistence before ack() makes the iteration contract obvious.
Reporting vs. Storing
Do not confuse streaming reporting with durable state persistence. They are separate operations.
| Action | Under the Hood | Consequences of Omitting |
|---|---|---|
helpers.report* | Triggers the functional event stream. | The user or client UI receives no real-time updates and freezes. |
ctx.store* | Invokes your configured storage callbacks. | The agent forgets the interaction instantly on the next iteration or turn. |
Perform both.
Reporting provides real-time client I/O. Storing ensures the model sees its own outputs and tool results in the next turn's message history.
Runaway Loop Detection
LLMs get stuck. They repeat failing tool calls, emit endless apologies, or loop through empty reasoning steps. ADK core does not impose built-in limits on execution iterations; loop boundary enforcement is the executor's responsibility.
ctx.iteration
Tracks the execution count of the current turn:
0: Initial user message.1: First tool results returned to the model.10+: High probability of an infinite loop.
ctx.toolCallCount(checksum)
Returns the number of times a tool + argument combination has been invoked during this turn. The checksum is executor-defined, but it must be the same fingerprint you store on the corresponding ToolCall. If a tool is called repeatedly with the same arguments, the model is failing to learn from the errors. Intervene immediately.
Example: Enforcing Iteration Caps
import type { DispatchExecutorFn } from '@nhtio/adk'
const MAX_ITERATIONS = 5
export const guardedExecutor: DispatchExecutorFn = async (ctx, helpers) => {
if (ctx.iteration >= MAX_ITERATIONS) {
ctx.nack(new Error(`Agent exceeded iteration threshold of ${MAX_ITERATIONS}`))
return
}
try {
// Model invocation loop here
ctx.ack()
} catch (error) {
ctx.nack(error instanceof Error ? error : new Error(String(error)))
}
}Always place iteration guards at the very beginning of your executor function.
Executor Implementation Examples
Here are three complete custom executors, ranging from a stub for baseline testing to a fully realized tool-capable runtime.
import type { DispatchExecutorFn } from '@nhtio/adk'
/**
* Satisfies the runtime signature. Useful for isolating storage issues
* and testing harness pipeline wiring without calling an external LLM.
*/
export const stubExecutor: DispatchExecutorFn = async (ctx, _helpers) => {
ctx.ack()
}import type { DispatchExecutorFn } from '@nhtio/adk'
import { Message } from '@nhtio/adk'
function requireEnv(name: string): string {
const value = process.env[name]
if (value === undefined || value.length === 0) {
throw new Error(`Missing required environment variable: ${name}`)
}
return value
}
/**
* Bare-metal executor speaking the OpenAI chat completions protocol.
* Uses native fetch streaming without heavy external dependencies.
*/
export const minimalExecutor: DispatchExecutorFn = async (ctx, helpers) => {
const messageId = crypto.randomUUID()
let text = ''
try {
const response = await fetch(requireEnv('CHAT_COMPLETIONS_ENDPOINT'), {
method: 'POST',
headers: {
Authorization: `Bearer ${requireEnv('API_KEY')}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: requireEnv('MODEL_ID'),
stream: true,
messages: [...ctx.turnMessages].map((m) => ({
role: m.role,
content: m.content?.toString() ?? '',
})),
}),
})
if (!response.ok || !response.body) {
throw new Error(`Model request failed with status: ${response.status}`)
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { value, done } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() ?? ''
for (const line of lines) {
const trimmed = line.trim()
if (!trimmed.startsWith('data:')) continue
const data = trimmed.slice(5).trim()
if (data === '[DONE]') continue
const event = JSON.parse(data)
const delta = event.choices?.[0]?.delta?.content
if (delta) {
text += delta
helpers.reportMessage(messageId, delta)
}
}
}
helpers.reportMessage(messageId, '', { isComplete: true })
const finalMessage = new Message({
id: messageId,
role: 'assistant',
content: text,
createdAt: new Date(),
updatedAt: new Date(),
})
await ctx.storeMessage(finalMessage)
ctx.ack()
} catch (error) {
ctx.nack(error instanceof Error ? error : new Error(String(error)))
}
}import type { DispatchExecutorFn } from '@nhtio/adk'
import { Message, SpooledArtifact, ToolCall } from '@nhtio/adk'
import { InMemorySpoolStore } from '@nhtio/adk/batteries/storage/in_memory'
// One spool store per process is enough for this example; in production you'd
// inject a Flydrive / OPFS / your-own store. See `assembly/batteries-storage`.
const spoolStore = new InMemorySpoolStore()
type ChatRequestMessage =
| { role: 'system' | 'user'; content: string }
| { role: 'assistant'; content: string | null; tool_calls?: ChatRequestToolCall[] }
| { role: 'tool'; tool_call_id: string; content: string }
type ChatRequestToolCall = {
id: string
type: 'function'
function: { name: string; arguments: string }
}
type JsonSchemaPrimitive = string | number | boolean
type ProviderJsonSchema = {
type?: 'string' | 'number' | 'boolean' | 'object' | 'array'
description?: string
enum?: JsonSchemaPrimitive[]
required?: string[]
properties?: Record<string, ProviderJsonSchema>
items?: ProviderJsonSchema
}
function requireEnv(name: string): string {
const value = process.env[name]
if (value === undefined || value.length === 0) {
throw new Error(`Missing required environment variable: ${name}`)
}
return value
}
const isRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === 'object' && value !== null && !Array.isArray(value)
const readSchemaType = (value: unknown): ProviderJsonSchema['type'] => {
switch (value) {
case 'string':
case 'number':
case 'boolean':
case 'object':
case 'array':
return value
default:
return undefined
}
}
const readStringArray = (value: unknown): string[] | undefined => {
if (!Array.isArray(value)) return undefined
return value.every((item) => typeof item === 'string') ? value : undefined
}
const readPrimitiveArray = (value: unknown): JsonSchemaPrimitive[] | undefined => {
if (!Array.isArray(value)) return undefined
return value.every(
(item) =>
typeof item === 'string' ||
typeof item === 'number' ||
typeof item === 'boolean'
)
? value
: undefined
}
/**
* Example schema adapter for the simple field types ADK tool schemas declare.
* It converts string/number/boolean/object/array, description, required, and enum
* into a JSON Schema-compatible object for chat-completions-style providers.
*/
const convertToProviderJsonSchema = (schema: unknown): ProviderJsonSchema => {
if (!isRecord(schema)) return { type: 'object', properties: {} }
const jsonSchema: ProviderJsonSchema = {}
const type = readSchemaType(schema.type)
if (type) jsonSchema.type = type
if (typeof schema.description === 'string') {
jsonSchema.description = schema.description
}
const enumValues = readPrimitiveArray(schema.enum)
if (enumValues) {
jsonSchema.enum = enumValues
}
if (isRecord(schema.properties)) {
const properties: Record<string, ProviderJsonSchema> = {}
const requiredFromProperties: string[] = []
for (const [name, propertySchema] of Object.entries(schema.properties)) {
properties[name] = convertToProviderJsonSchema(propertySchema)
if (isRecord(propertySchema) && propertySchema.required === true) {
requiredFromProperties.push(name)
}
}
jsonSchema.type = 'object'
jsonSchema.properties = properties
const explicitRequired = readStringArray(schema.required)
const required = explicitRequired ?? requiredFromProperties
if (required.length > 0) {
jsonSchema.required = required
}
}
if (schema.items !== undefined) {
jsonSchema.type = 'array'
jsonSchema.items = convertToProviderJsonSchema(schema.items)
}
return jsonSchema
}
const canonicalStringify = (value: unknown): string => {
if (value === null || typeof value !== 'object') return JSON.stringify(value)
if (Array.isArray(value)) return `[${value.map(canonicalStringify).join(',')}]`
return `{${Object.entries(value as Record<string, unknown>)
.sort(([a], [b]) => a.localeCompare(b))
.map(([key, val]) => `${JSON.stringify(key)}:${canonicalStringify(val)}`)
.join(',')}}`
}
const computeToolCallChecksum = async (tool: string, args: unknown): Promise<string> => {
const bytes = new TextEncoder().encode(canonicalStringify({ tool, args }))
const digest = await crypto.subtle.digest('SHA-256', bytes)
return [...new Uint8Array(digest)]
.map((byte) => byte.toString(16).padStart(2, '0'))
.join('')
}
/**
* Complete custom loop driving tool execution. Executes tools inline,
* records results, and feeds outputs back into the model context.
*/
export const toolCapableExecutor: DispatchExecutorFn = async (ctx, helpers) => {
const messages: ChatRequestMessage[] = [...ctx.turnMessages].map((m): ChatRequestMessage => {
const content = m.content?.toString() ?? ''
if (m.role === 'assistant') return { role: 'assistant', content }
return { role: 'user', content }
})
try {
while (!ctx.isSignalled) {
const messageId = crypto.randomUUID()
let text = ''
const toolCallsByIndex = new Map<number, ChatRequestToolCall>()
const toolSchemas = ctx.tools.all().map((tool) => {
const described = tool.describe()
return {
type: 'function',
function: {
name: described.name,
description: described.description,
parameters: convertToProviderJsonSchema(described.inputSchema),
},
}
})
const response = await fetch(requireEnv('CHAT_COMPLETIONS_ENDPOINT'), {
method: 'POST',
headers: {
Authorization: `Bearer ${requireEnv('API_KEY')}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: requireEnv('MODEL_ID'),
stream: true,
messages,
tools: toolSchemas.length > 0 ? toolSchemas : undefined,
}),
})
if (!response.ok || !response.body) {
throw new Error(`Model request failed with status: ${response.status}`)
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { value, done } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() ?? ''
for (const line of lines) {
const trimmed = line.trim()
if (!trimmed.startsWith('data:')) continue
const data = trimmed.slice(5).trim()
if (data === '[DONE]') continue
const event = JSON.parse(data)
const delta = event.choices?.[0]?.delta
if (delta?.content) {
text += delta.content
helpers.reportMessage(messageId, delta.content)
}
for (const partial of delta?.tool_calls ?? []) {
const index = partial.index ?? 0
const existing = toolCallsByIndex.get(index) ?? {
id: partial.id ?? crypto.randomUUID(),
type: 'function',
function: { name: '', arguments: '' },
}
existing.id = partial.id ?? existing.id
existing.function.name = partial.function?.name ?? existing.function.name
existing.function.arguments += partial.function?.arguments ?? ''
toolCallsByIndex.set(index, existing)
}
}
}
const toolCalls: ChatRequestToolCall[] = [...toolCallsByIndex.values()]
// If no tools are requested, we're done with this turn
if (toolCalls.length === 0) {
helpers.reportMessage(messageId, '', { isComplete: true })
const finalMessage = new Message({
id: messageId,
role: 'assistant',
content: text,
createdAt: new Date(),
updatedAt: new Date(),
})
await ctx.storeMessage(finalMessage)
ctx.ack()
return
}
// Record assistant's intent to call tools in history
messages.push({
role: 'assistant',
content: text || null,
tool_calls: toolCalls,
})
// Execute all tool calls sequentially
for (const tc of toolCalls) {
const args = tc.function.arguments ? JSON.parse(tc.function.arguments) : {}
const toolName = tc.function.name
helpers.reportToolCall(tc.id, { tool: toolName, args })
const tool = ctx.tools.get(toolName)
if (!tool) throw new Error(`Tool not found: ${toolName}`)
// Execute passing context; the tool computes call ID internally
const rawResults = await tool.executor(ctx)(args)
const ArtifactCtor = tool.artifactConstructor?.() ?? SpooledArtifact
const results =
typeof rawResults === 'string' || rawResults instanceof Uint8Array
? new ArtifactCtor(spoolStore.write(tc.id, rawResults))
: rawResults
const completedAt = new Date()
const checksum = await computeToolCallChecksum(toolName, args)
helpers.reportToolCall(tc.id, { results, isComplete: true })
const persistedToolCall = new ToolCall({
id: tc.id,
checksum,
tool: toolName,
args,
results,
isError: false,
isComplete: true,
completedAt,
createdAt: new Date(),
updatedAt: completedAt,
})
await ctx.storeToolCall(persistedToolCall)
messages.push({
role: 'tool',
tool_call_id: tc.id,
content: typeof rawResults === 'string' ? rawResults : JSON.stringify(rawResults),
})
}
}
} catch (error) {
ctx.nack(error instanceof Error ? error : new Error(String(error)))
}
}Common Failures
// WRONG — the turn hangs indefinitely because no signal is sent
const executor: DispatchExecutorFn = async (ctx, helpers) => {
const res = await callModel()
return
}
// RIGHT — always signal execution success
const executor: DispatchExecutorFn = async (ctx, helpers) => {
const res = await callModel()
ctx.ack()
}// WRONG — catching an error without signaling nack causes an infinite hang
const executor: DispatchExecutorFn = async (ctx, helpers) => {
try {
await doWork()
ctx.ack()
} catch (e) {
console.error(e) // Swallowed! The engine sits waiting forever.
}
}
// RIGHT — propagate errors immediately via nack
const executor: DispatchExecutorFn = async (ctx, helpers) => {
try {
await doWork()
ctx.ack()
} catch (e) {
ctx.nack(e instanceof Error ? e : new Error(String(e)))
}
}// WRONG — reasoning models belong strictly in the executor seam, not pipelines
const wrongPipelineStep = async (ctx: TurnContext, next: () => Promise<void>) => {
const res = await callModel({ /* ... */ })
await next()
}
// RIGHT — delegate intelligence to the executor, use pipelines for data loading
const executor: DispatchExecutorFn = async (ctx, helpers) => {
const res = await callModel({ /* ... */ })
ctx.ack()
}The Reference Implementation
Before building your own parser or handling complex streaming states, read the OpenAIChatCompletionsAdapter source. It shows:
- The strict coordination between
helpers.report*,ctx.store*, andctx.ack(). - Accurate reassembly of partial, out-of-order SSE chunks.
- Three-tier configuration merging (constructor configuration -> runtime executor overrides -> per-turn overrides stored in the
Registryviactx.stash.get('...')). - Forwarding
ctx.abortSignalto handle timeouts and client disconnect aborts.
A battery adapter is not a special case built on internal engine hacks. It implements the exact same DispatchExecutorFn contract you write by hand. See LLM batteries to learn how to deploy and configure them.
Rendering Media into Provider Content Blocks
If your custom executor allows the model to receive or generate media, satisfy three requirements:
- Map
Mediainstances into provider-specific payloads. When media objects are supplied insideToolCall.resultsorMessage.attachments, the executor is responsible for converting them into the target API's structural blocks (e.g. base64-encoded inline blocks or streaming uploads viamedia.stream(),media.asBytes(), ormedia.asBase64()). - Handle unsupported modalities explicitly. Adopt a clean
unsupportedMediaPolicyconfiguration. The native Chat Completions adapter implements three policies:'throw','fallback-stash', and'synthetic-description'. Default to'throw'` to avoid silent loss of information. - Respect Trust-Is-Content boundaries.
Media.trustTiergoverns the trust envelope. The tool'strustedboolean must never override the trust level of the individualMediapayload itself. See the Trust tiers → Media matrix for standard rendering rules.
Scanning is a Network Concern
Antivirus checking or DLP scanning of media payloads is a deployment-level infrastructure concern. Executors are transit layers—they forward media byte payloads and must not run internal scanning logic. Place scanners in ingress/egress middleware or as network proxy policies.