Skip to content
8 min read · 1,588 words

Bring your own LLM

Core ADK ships with no default model client. There is no hidden model provider, no automatic API key resolution, and no fallback model. If you want a ready-to-use LLM runtime, opt in to one of the first-party batteries: OpenAIChatCompletionsAdapter or WebLLMChatCompletionsAdapter.

If you do not want to write a custom executor, you do not have to. You still must provide your TurnRunner callbacks; LLM batteries only satisfy the executor seam. See Batteries. Wiring an off-the-shelf battery is a single line of configuration:

typescript
import { TurnRunner } from '@nhtio/adk'
import { OpenAIChatCompletionsAdapter } from '@nhtio/adk/batteries/llm'

const runner = new TurnRunner({
  ...callbacks,
  executorCallback: new OpenAIChatCompletionsAdapter({
    model: 'gpt-4o',
    apiKey: process.env.OPENAI_API_KEY,
    autoAck: true,
  }).executor(),
})

autoAck: true is required here because the executor does not call ctx.ack() by default — the implementor owns turn completion, and autoAck: true restores single-shot behavior.

A custom executorCallback is the single required seam between the ADK runtime and your choice of intelligence.

If your agent loops, hallucinates, or drops connection, your executor is the first place to look. ADK provides the rails. You provide the model call, stream parser, tool loop, retry policy, and terminal signal.

The Executor Slot

ADK has zero interest in how decisions are made. It does not parse system instructions, format conversation history, or speak to APIs. The executor is a slot—a function that bridges ADK's DispatchContext to your LLM client, rules engine, or custom state machine.

The "model" doesn't have to be an LLM

The DispatchExecutorFn can wrap a model, a hardcoded decision tree, or a remote agent. The rest of the ADK runtime—turns, tools, state, event streaming—operates identically. See How agents work for details.

These rules are the boundary:

  • No primary reasoning loops in pipelines. Pipelines must not be the primary reasoning loop. Secondary preprocessing (e.g. query rewriting, classification) is a deliberate exception where you pay double latency and cost. Accept that trade-off explicitly; not as muscle memory.
  • Never call a model in an event listener. Event listeners are telemetry sinks. Triggering model calls inside them creates unmonitored execution paths and breaks the lifecycle.
  • Never call a model inside a tool—unless that tool is explicitly a sub-agent wrapping its own scoped TurnRunner.

The DispatchExecutorFn Contract

A custom executor must implement the DispatchExecutorFn signature:

typescript
import type { DispatchExecutorFn } from '@nhtio/adk'

const myExecutor: DispatchExecutorFn = async (ctx, helpers) => {
  // Your code here
}

The interface definitions:

typescript
type DispatchExecutorFn = (
  ctx: DispatchContext,
  helpers: DispatchExecutorHelpers
) => void | Promise<void>

ctx: DispatchContext

The dispatch context provides the current turn state and controls the execution lifecycle. Key properties:

MemberType / Description
ctx.turnMessagesSet<Message> — The conversation history for this turn.
ctx.toolsToolRegistry — The tools available. Use ctx.tools.all() to list them.
ctx.iterationnumber — Zero-based count of how many times the executor has run this turn.
ctx.toolCallCount(checksum)(checksum: string) => number — Execution frequency of a tool + args combo. Pass the same executor-defined checksum you persist for that tool call.
ctx.isSignalledbooleantrue if ack() or nack() has been called.
ctx.abortSignalAbortSignal — The signal on the active DispatchContext, shared with the turn abort controller.
ctx.storeMessage(m)(m: Message) => Promise<void> — Persist a message.
ctx.storeToolCall(tc)(tc: ToolCall) => Promise<void> — Persist a tool call and its results.
ctx.ack()() => void — Signal successful iteration completion.
ctx.nack(error)(error?: Error) => void — Signal iteration failure.

helpers: DispatchExecutorHelpers

The I/O interface for real-time telemetry and streaming:

MethodDescription
helpers.reportMessage(id, delta, opts?)Stream text chunks to the functional event bus.
helpers.reportThought(id, delta, opts?)Stream thinking/reasoning chunks (separate from message stream).
helpers.reportToolCall(id, partial)Emit tool status (arguments, execution state, final result).
helpers.logStructured logger (trace, debug, info, warn, error) bound to this turn.

The Ack/Nack Invariant

Crucial Invariant

Call exactly one of ctx.ack() or ctx.nack(error) exactly once per executor invocation.

  • Failing to signal (without throwing): The dispatch loops indefinitely unless middleware signals or aborts. ADK core implements no built-in timeouts or iteration caps.
  • Calling both or calling either twice: Throws E_LLM_EXECUTION_ALREADY_SIGNALLED.
  • Unhandled exceptions: If your executor code throws an unhandled error, dispatch will reject/end as a nack-status error.

A try/catch block is highly recommended for clean, provider-specific cleanup and error formatting, but it is not technically required to prevent hangs caused by thrown errors.

typescript
import type { DispatchExecutorFn } from '@nhtio/adk'

const safeExecutor: DispatchExecutorFn = async (ctx, helpers) => {
  try {
    // Call your model here
    ctx.ack()
  } catch (error) {
    const err = error instanceof Error ? error : new Error(String(error))
    ctx.nack(err)
  }
}

Calling ctx.ack() marks the current iteration as successful. Calling ctx.nack(error) propagates the error to the observability bus, fires turnEnd, and terminates the turn.

The Five Jobs of an Executor

A robust custom executor executes these five tasks in sequence:

1. Format the Prompt Context

Map ADK primitives to your provider's expected schema:

  • Convert ctx.turnMessages into chat roles and content.
  • Map ctx.tools.all() to the model's tool/function calling definitions.
  • Inspect ctx.iteration to inject corrective prompting if the agent is stuck.

2. Call the Model Client

Invoke your provider. While streaming is a product-level choice and not mandated by ADK, most interactive applications should stream chunks to minimize perceived latency.

3. Stream Telemetry via Helpers

Relay incoming chunks immediately to notify consumers in real time:

  • Use helpers.reportMessage(messageId, chunk) for text content.
  • Use helpers.reportThought(messageId, chunk) for reasoning/thinking blocks.
  • Use helpers.reportToolCall(callId, { tool, args }) for tool call streaming.

4. Execute Tools Inline

If the model requests tool execution, resolve them inside the executor:

  1. Fetch the tool instance: const tool = ctx.tools.get(toolName).
  2. Execute the tool, passing the context: const results = await tool.executor(ctx)(args).
  3. Report completion: helpers.reportToolCall(callId, { results, isComplete: true }).
  4. Persist the execution details: await ctx.storeToolCall(toolCall).
  5. Continue the loop. Append the tool results to your conversation history and call the model again.

5. Finalize the Iteration

If the model yields a final text response (no further tool calls):

  1. Persist the final message: await ctx.storeMessage(finalMessage).
  2. Close the loop: ctx.ack().

Ack Should Be Last

Treat ctx.ack() as the final lifecycle signal by convention. It does not disable ctx.store*, and awaited writes before the executor returns are still flushed, but keeping persistence before ack() makes the iteration contract obvious.


Reporting vs. Storing

Do not confuse streaming reporting with durable state persistence. They are separate operations.

ActionUnder the HoodConsequences of Omitting
helpers.report*Triggers the functional event stream.The user or client UI receives no real-time updates and freezes.
ctx.store*Invokes your configured storage callbacks.The agent forgets the interaction instantly on the next iteration or turn.

Perform both.

Reporting provides real-time client I/O. Storing ensures the model sees its own outputs and tool results in the next turn's message history.


Runaway Loop Detection

LLMs get stuck. They repeat failing tool calls, emit endless apologies, or loop through empty reasoning steps. ADK core does not impose built-in limits on execution iterations; loop boundary enforcement is the executor's responsibility.

ctx.iteration

Tracks the execution count of the current turn:

  • 0: Initial user message.
  • 1: First tool results returned to the model.
  • 10+: High probability of an infinite loop.

ctx.toolCallCount(checksum)

Returns the number of times a tool + argument combination has been invoked during this turn. The checksum is executor-defined, but it must be the same fingerprint you store on the corresponding ToolCall. If a tool is called repeatedly with the same arguments, the model is failing to learn from the errors. Intervene immediately.

Example: Enforcing Iteration Caps

typescript
import type { DispatchExecutorFn } from '@nhtio/adk'

const MAX_ITERATIONS = 5

export const guardedExecutor: DispatchExecutorFn = async (ctx, helpers) => {
  if (ctx.iteration >= MAX_ITERATIONS) {
    ctx.nack(new Error(`Agent exceeded iteration threshold of ${MAX_ITERATIONS}`))
    return
  }

  try {
    // Model invocation loop here
    ctx.ack()
  } catch (error) {
    ctx.nack(error instanceof Error ? error : new Error(String(error)))
  }
}

Always place iteration guards at the very beginning of your executor function.


Executor Implementation Examples

Here are three complete custom executors, ranging from a stub for baseline testing to a fully realized tool-capable runtime.

typescript
import type { DispatchExecutorFn } from '@nhtio/adk'

/**
 * Satisfies the runtime signature. Useful for isolating storage issues
 * and testing harness pipeline wiring without calling an external LLM.
 */
export const stubExecutor: DispatchExecutorFn = async (ctx, _helpers) => {
  ctx.ack()
}
typescript
import type { DispatchExecutorFn } from '@nhtio/adk'
import { Message } from '@nhtio/adk'

function requireEnv(name: string): string {
  const value = process.env[name]
  if (value === undefined || value.length === 0) {
    throw new Error(`Missing required environment variable: ${name}`)
  }
  return value
}

/**
 * Bare-metal executor speaking the OpenAI chat completions protocol.
 * Uses native fetch streaming without heavy external dependencies.
 */
export const minimalExecutor: DispatchExecutorFn = async (ctx, helpers) => {
  const messageId = crypto.randomUUID()
  let text = ''

  try {
    const response = await fetch(requireEnv('CHAT_COMPLETIONS_ENDPOINT'), {
      method: 'POST',
      headers: {
        Authorization: `Bearer ${requireEnv('API_KEY')}`,
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({
        model: requireEnv('MODEL_ID'),
        stream: true,
        messages: [...ctx.turnMessages].map((m) => ({
          role: m.role,
          content: m.content?.toString() ?? '',
        })),
      }),
    })

    if (!response.ok || !response.body) {
      throw new Error(`Model request failed with status: ${response.status}`)
    }

    const reader = response.body.getReader()
    const decoder = new TextDecoder()
    let buffer = ''

    while (true) {
      const { value, done } = await reader.read()
      if (done) break

      buffer += decoder.decode(value, { stream: true })
      const lines = buffer.split('\n')
      buffer = lines.pop() ?? ''

      for (const line of lines) {
        const trimmed = line.trim()
        if (!trimmed.startsWith('data:')) continue

        const data = trimmed.slice(5).trim()
        if (data === '[DONE]') continue

        const event = JSON.parse(data)
        const delta = event.choices?.[0]?.delta?.content
        if (delta) {
          text += delta
          helpers.reportMessage(messageId, delta)
        }
      }
    }

    helpers.reportMessage(messageId, '', { isComplete: true })

    const finalMessage = new Message({
      id: messageId,
      role: 'assistant',
      content: text,
      createdAt: new Date(),
      updatedAt: new Date(),
    })

    await ctx.storeMessage(finalMessage)
    ctx.ack()
  } catch (error) {
    ctx.nack(error instanceof Error ? error : new Error(String(error)))
  }
}
typescript
import type { DispatchExecutorFn } from '@nhtio/adk'
import { Message, SpooledArtifact, ToolCall } from '@nhtio/adk'
import { InMemorySpoolStore } from '@nhtio/adk/batteries/storage/in_memory'

// One spool store per process is enough for this example; in production you'd
// inject a Flydrive / OPFS / your-own store. See `assembly/batteries-storage`.
const spoolStore = new InMemorySpoolStore()

type ChatRequestMessage =
  | { role: 'system' | 'user'; content: string }
  | { role: 'assistant'; content: string | null; tool_calls?: ChatRequestToolCall[] }
  | { role: 'tool'; tool_call_id: string; content: string }

type ChatRequestToolCall = {
  id: string
  type: 'function'
  function: { name: string; arguments: string }
}

type JsonSchemaPrimitive = string | number | boolean

type ProviderJsonSchema = {
  type?: 'string' | 'number' | 'boolean' | 'object' | 'array'
  description?: string
  enum?: JsonSchemaPrimitive[]
  required?: string[]
  properties?: Record<string, ProviderJsonSchema>
  items?: ProviderJsonSchema
}

function requireEnv(name: string): string {
  const value = process.env[name]
  if (value === undefined || value.length === 0) {
    throw new Error(`Missing required environment variable: ${name}`)
  }
  return value
}

const isRecord = (value: unknown): value is Record<string, unknown> =>
  typeof value === 'object' && value !== null && !Array.isArray(value)

const readSchemaType = (value: unknown): ProviderJsonSchema['type'] => {
  switch (value) {
    case 'string':
    case 'number':
    case 'boolean':
    case 'object':
    case 'array':
      return value
    default:
      return undefined
  }
}

const readStringArray = (value: unknown): string[] | undefined => {
  if (!Array.isArray(value)) return undefined
  return value.every((item) => typeof item === 'string') ? value : undefined
}

const readPrimitiveArray = (value: unknown): JsonSchemaPrimitive[] | undefined => {
  if (!Array.isArray(value)) return undefined
  return value.every(
    (item) =>
      typeof item === 'string' ||
      typeof item === 'number' ||
      typeof item === 'boolean'
  )
    ? value
    : undefined
}

/**
 * Example schema adapter for the simple field types ADK tool schemas declare.
 * It converts string/number/boolean/object/array, description, required, and enum
 * into a JSON Schema-compatible object for chat-completions-style providers.
 */
const convertToProviderJsonSchema = (schema: unknown): ProviderJsonSchema => {
  if (!isRecord(schema)) return { type: 'object', properties: {} }

  const jsonSchema: ProviderJsonSchema = {}
  const type = readSchemaType(schema.type)
  if (type) jsonSchema.type = type

  if (typeof schema.description === 'string') {
    jsonSchema.description = schema.description
  }

  const enumValues = readPrimitiveArray(schema.enum)
  if (enumValues) {
    jsonSchema.enum = enumValues
  }

  if (isRecord(schema.properties)) {
    const properties: Record<string, ProviderJsonSchema> = {}
    const requiredFromProperties: string[] = []

    for (const [name, propertySchema] of Object.entries(schema.properties)) {
      properties[name] = convertToProviderJsonSchema(propertySchema)

      if (isRecord(propertySchema) && propertySchema.required === true) {
        requiredFromProperties.push(name)
      }
    }

    jsonSchema.type = 'object'
    jsonSchema.properties = properties

    const explicitRequired = readStringArray(schema.required)
    const required = explicitRequired ?? requiredFromProperties
    if (required.length > 0) {
      jsonSchema.required = required
    }
  }

  if (schema.items !== undefined) {
    jsonSchema.type = 'array'
    jsonSchema.items = convertToProviderJsonSchema(schema.items)
  }

  return jsonSchema
}

const canonicalStringify = (value: unknown): string => {
  if (value === null || typeof value !== 'object') return JSON.stringify(value)
  if (Array.isArray(value)) return `[${value.map(canonicalStringify).join(',')}]`

  return `{${Object.entries(value as Record<string, unknown>)
    .sort(([a], [b]) => a.localeCompare(b))
    .map(([key, val]) => `${JSON.stringify(key)}:${canonicalStringify(val)}`)
    .join(',')}}`
}

const computeToolCallChecksum = async (tool: string, args: unknown): Promise<string> => {
  const bytes = new TextEncoder().encode(canonicalStringify({ tool, args }))
  const digest = await crypto.subtle.digest('SHA-256', bytes)
  return [...new Uint8Array(digest)]
    .map((byte) => byte.toString(16).padStart(2, '0'))
    .join('')
}

/**
 * Complete custom loop driving tool execution. Executes tools inline,
 * records results, and feeds outputs back into the model context.
 */
export const toolCapableExecutor: DispatchExecutorFn = async (ctx, helpers) => {
  const messages: ChatRequestMessage[] = [...ctx.turnMessages].map((m): ChatRequestMessage => {
    const content = m.content?.toString() ?? ''
    if (m.role === 'assistant') return { role: 'assistant', content }
    return { role: 'user', content }
  })

  try {
    while (!ctx.isSignalled) {
      const messageId = crypto.randomUUID()
      let text = ''
      const toolCallsByIndex = new Map<number, ChatRequestToolCall>()
      const toolSchemas = ctx.tools.all().map((tool) => {
        const described = tool.describe()
        return {
          type: 'function',
          function: {
            name: described.name,
            description: described.description,
            parameters: convertToProviderJsonSchema(described.inputSchema),
          },
        }
      })

      const response = await fetch(requireEnv('CHAT_COMPLETIONS_ENDPOINT'), {
        method: 'POST',
        headers: {
          Authorization: `Bearer ${requireEnv('API_KEY')}`,
          'Content-Type': 'application/json',
        },
        body: JSON.stringify({
          model: requireEnv('MODEL_ID'),
          stream: true,
          messages,
          tools: toolSchemas.length > 0 ? toolSchemas : undefined,
        }),
      })

      if (!response.ok || !response.body) {
        throw new Error(`Model request failed with status: ${response.status}`)
      }

      const reader = response.body.getReader()
      const decoder = new TextDecoder()
      let buffer = ''

      while (true) {
        const { value, done } = await reader.read()
        if (done) break

        buffer += decoder.decode(value, { stream: true })
        const lines = buffer.split('\n')
        buffer = lines.pop() ?? ''

        for (const line of lines) {
          const trimmed = line.trim()
          if (!trimmed.startsWith('data:')) continue

          const data = trimmed.slice(5).trim()
          if (data === '[DONE]') continue

          const event = JSON.parse(data)
          const delta = event.choices?.[0]?.delta

          if (delta?.content) {
            text += delta.content
            helpers.reportMessage(messageId, delta.content)
          }

          for (const partial of delta?.tool_calls ?? []) {
            const index = partial.index ?? 0
            const existing = toolCallsByIndex.get(index) ?? {
              id: partial.id ?? crypto.randomUUID(),
              type: 'function',
              function: { name: '', arguments: '' },
            }

            existing.id = partial.id ?? existing.id
            existing.function.name = partial.function?.name ?? existing.function.name
            existing.function.arguments += partial.function?.arguments ?? ''
            toolCallsByIndex.set(index, existing)
          }
        }
      }

      const toolCalls: ChatRequestToolCall[] = [...toolCallsByIndex.values()]

      // If no tools are requested, we're done with this turn
      if (toolCalls.length === 0) {
        helpers.reportMessage(messageId, '', { isComplete: true })
        const finalMessage = new Message({
          id: messageId,
          role: 'assistant',
          content: text,
          createdAt: new Date(),
          updatedAt: new Date(),
        })
        await ctx.storeMessage(finalMessage)
        ctx.ack()
        return
      }

      // Record assistant's intent to call tools in history
      messages.push({
        role: 'assistant',
        content: text || null,
        tool_calls: toolCalls,
      })

      // Execute all tool calls sequentially
      for (const tc of toolCalls) {
        const args = tc.function.arguments ? JSON.parse(tc.function.arguments) : {}
        const toolName = tc.function.name

        helpers.reportToolCall(tc.id, { tool: toolName, args })

        const tool = ctx.tools.get(toolName)
        if (!tool) throw new Error(`Tool not found: ${toolName}`)

        // Execute passing context; the tool computes call ID internally
        const rawResults = await tool.executor(ctx)(args)
        const ArtifactCtor = tool.artifactConstructor?.() ?? SpooledArtifact
        const results =
          typeof rawResults === 'string' || rawResults instanceof Uint8Array
            ? new ArtifactCtor(spoolStore.write(tc.id, rawResults))
            : rawResults
        const completedAt = new Date()
        const checksum = await computeToolCallChecksum(toolName, args)

        helpers.reportToolCall(tc.id, { results, isComplete: true })

        const persistedToolCall = new ToolCall({
          id: tc.id,
          checksum,
          tool: toolName,
          args,
          results,
          isError: false,
          isComplete: true,
          completedAt,
          createdAt: new Date(),
          updatedAt: completedAt,
        })

        await ctx.storeToolCall(persistedToolCall)

        messages.push({
          role: 'tool',
          tool_call_id: tc.id,
          content: typeof rawResults === 'string' ? rawResults : JSON.stringify(rawResults),
        })
      }
    }
  } catch (error) {
    ctx.nack(error instanceof Error ? error : new Error(String(error)))
  }
}

Common Failures

typescript
// WRONG — the turn hangs indefinitely because no signal is sent
const executor: DispatchExecutorFn = async (ctx, helpers) => {
  const res = await callModel()
  return 
}

// RIGHT — always signal execution success
const executor: DispatchExecutorFn = async (ctx, helpers) => {
  const res = await callModel()
  ctx.ack()
}
typescript
// WRONG — catching an error without signaling nack causes an infinite hang
const executor: DispatchExecutorFn = async (ctx, helpers) => {
  try {
    await doWork()
    ctx.ack()
  } catch (e) {
    console.error(e) // Swallowed! The engine sits waiting forever.
  }
}

// RIGHT — propagate errors immediately via nack
const executor: DispatchExecutorFn = async (ctx, helpers) => {
  try {
    await doWork()
    ctx.ack()
  } catch (e) {
    ctx.nack(e instanceof Error ? e : new Error(String(e)))
  }
}
typescript
// WRONG — reasoning models belong strictly in the executor seam, not pipelines
const wrongPipelineStep = async (ctx: TurnContext, next: () => Promise<void>) => {
  const res = await callModel({ /* ... */ })
  await next()
}

// RIGHT — delegate intelligence to the executor, use pipelines for data loading
const executor: DispatchExecutorFn = async (ctx, helpers) => {
  const res = await callModel({ /* ... */ })
  ctx.ack()
}

The Reference Implementation

Before building your own parser or handling complex streaming states, read the OpenAIChatCompletionsAdapter source. It shows:

  • The strict coordination between helpers.report*, ctx.store*, and ctx.ack().
  • Accurate reassembly of partial, out-of-order SSE chunks.
  • Three-tier configuration merging (constructor configuration -> runtime executor overrides -> per-turn overrides stored in the Registry via ctx.stash.get('...')).
  • Forwarding ctx.abortSignal to handle timeouts and client disconnect aborts.

A battery adapter is not a special case built on internal engine hacks. It implements the exact same DispatchExecutorFn contract you write by hand. See LLM batteries to learn how to deploy and configure them.


Rendering Media into Provider Content Blocks

If your custom executor allows the model to receive or generate media, satisfy three requirements:

  1. Map Media instances into provider-specific payloads. When media objects are supplied inside ToolCall.results or Message.attachments, the executor is responsible for converting them into the target API's structural blocks (e.g. base64-encoded inline blocks or streaming uploads via media.stream(), media.asBytes(), or media.asBase64()).
  2. Handle unsupported modalities explicitly. Adopt a clean unsupportedMediaPolicy configuration. The native Chat Completions adapter implements three policies: 'throw', 'fallback-stash', and 'synthetic-description'. Default to 'throw'` to avoid silent loss of information.
  3. Respect Trust-Is-Content boundaries. Media.trustTier governs the trust envelope. The tool's trusted boolean must never override the trust level of the individual Media payload itself. See the Trust tiers → Media matrix for standard rendering rules.

Scanning is a Network Concern

Antivirus checking or DLP scanning of media payloads is a deployment-level infrastructure concern. Executors are transit layers—they forward media byte payloads and must not run internal scanning logic. Place scanners in ingress/egress middleware or as network proxy policies.