Minimal agent assembly
Minimal here means the smallest real-model assembly. The no-key scaffold was the Quickstart; this is where you plug in the engine. The two pages share src/noop-storage.ts and src/hydrate-messages.ts verbatim — only src/agent.ts changes, and inside it, only the executor slot.
This page gives you one linear setup that sends Hello to an assistant and streams the reply.
Install
Use Node 20+ and run this in a TypeScript project:
npm install @nhtio/adk
npm install -D tsx typescript @types/nodeyarn add @nhtio/adk
yarn add -D tsx typescript @types/nodepnpm add @nhtio/adk
pnpm add -D tsx typescript @types/nodebun add @nhtio/adk
bun add -d tsx typescript @types/nodeThis guide assumes TypeScript ESM and uses top-level await, which tsx supports.
File structure
Create exactly these three files:
src/
noop-storage.ts
hydrate-messages.ts
agent.tssrc/noop-storage.ts
Put this in src/noop-storage.ts; it supplies the complete no-op storage adapter with the required callback arities.
// The 25-callback no-op storage adapter.
//
// Spread this into TurnRunnerConfig as a baseline, then override only the
// callbacks you actually want to do work. The runtime validator requires the
// declared arity (1 for fetch, 2 for store/mutate/delete); a zero-arity
// callback will throw E_INVALID_TURN_RUNNER_CONFIG at construction.
import type {
MemoryRetrievalFn,
MessageRetrievalFn,
ThoughtRetrievalFn,
ToolCallRetrievalFn,
ToolsRetrievalFn,
RetrievableRetrievalFn,
StandingInstructionsRefreshFn,
MemoryStoreFn,
MemoryMutateFn,
MemoryDeleteFn,
MessageStoreFn,
MessageMutateFn,
MessageDeleteFn,
ThoughtStoreFn,
ThoughtMutateFn,
ThoughtDeleteFn,
ToolCallStoreFn,
ToolCallMutateFn,
ToolCallDeleteFn,
RetrievableStoreFn,
RetrievableMutateFn,
RetrievableDeleteFn,
StandingInstructionStoreFn,
StandingInstructionMutateFn,
StandingInstructionDeleteFn,
} from '@nhtio/adk'
export const noopStorageAdapter = {
// Memories
fetchMemoriesCallback: (async (_ctx) => []) as MemoryRetrievalFn,
storeMemoryCallback: (async (_ctx, _m) => {}) as MemoryStoreFn,
mutateMemoryCallback: (async (_ctx, _m) => {}) as MemoryMutateFn,
deleteMemoryCallback: (async (_ctx, _id) => {}) as MemoryDeleteFn,
// Messages
fetchMessagesCallback: (async (_ctx) => []) as MessageRetrievalFn,
storeMessageCallback: (async (_ctx, _m) => {}) as MessageStoreFn,
mutateMessageCallback: (async (_ctx, _m) => {}) as MessageMutateFn,
deleteMessageCallback: (async (_ctx, _id) => {}) as MessageDeleteFn,
// Thoughts
fetchThoughtsCallback: (async (_ctx) => []) as ThoughtRetrievalFn,
storeThoughtCallback: (async (_ctx, _t) => {}) as ThoughtStoreFn,
mutateThoughtCallback: (async (_ctx, _t) => {}) as ThoughtMutateFn,
deleteThoughtCallback: (async (_ctx, _id) => {}) as ThoughtDeleteFn,
// ToolCalls
fetchToolCallsCallback: (async (_ctx) => []) as ToolCallRetrievalFn,
storeToolCallCallback: (async (_ctx, _tc) => {}) as ToolCallStoreFn,
mutateToolCallCallback: (async (_ctx, _tc) => {}) as ToolCallMutateFn,
deleteToolCallCallback: (async (_ctx, _id) => {}) as ToolCallDeleteFn,
// Tools (supplementary tools the model can see, fetched per turn)
fetchToolsCallback: (async (_ctx) => []) as ToolsRetrievalFn,
// Retrievables
fetchRetrievablesCallback: (async (_ctx) => []) as RetrievableRetrievalFn,
storeRetrievableCallback: (async (_ctx, _r) => {}) as RetrievableStoreFn,
mutateRetrievableCallback: (async (_ctx, _r) => {}) as RetrievableMutateFn,
deleteRetrievableCallback: (async (_ctx, _id) => {}) as RetrievableDeleteFn,
// Standing instructions (string | Tokenizable — no class primitive)
refreshStandingInstructionsCallback: (async (_ctx) => []) as StandingInstructionsRefreshFn,
storeStandingInstructionCallback: (async (_ctx, _v) => {}) as StandingInstructionStoreFn,
mutateStandingInstructionCallback: (async (_ctx, _v) => {}) as StandingInstructionMutateFn,
deleteStandingInstructionCallback: (async (_ctx, _v) => {}) as StandingInstructionDeleteFn,
}src/hydrate-messages.ts
Put this in src/hydrate-messages.ts; it fetches messages and loads them into the turn before dispatch.
// Canonical turnInputPipeline middleware: load conversation history into the
// turn context before the executor sees it.
//
// ADK does NOT auto-call fetchMessagesCallback. Until a middleware like this
// runs, ctx.turnMessages is an empty Set and the executor reasons about
// nothing. Put this first in turnInputPipeline.
import type { TurnPipelineMiddlewareFn } from '@nhtio/adk'
export const hydrateMessages: TurnPipelineMiddlewareFn = async (ctx, next) => {
const messages = await ctx.fetchMessages()
for (const m of messages) {
ctx.turnMessages.add(m)
}
await next()
}src/agent.ts
Put this in src/agent.ts; it wires storage, hydration, the OpenAI battery, a seeded user message, stream events, and one turn.
import { Message, TurnRunner } from '@nhtio/adk'
import type { MessageRetrievalFn } from '@nhtio/adk'
import { OpenAIChatCompletionsAdapter } from '@nhtio/adk/batteries/llm'
import { hydrateMessages } from './hydrate-messages'
import { noopStorageAdapter } from './noop-storage'
function requireEnv(name: string): string {
const value = process.env[name]
if (value === undefined || value.length === 0) {
throw new Error(`Missing required environment variable: ${name}`)
}
return value
}
const initialUserMessage = new Message({
id: crypto.randomUUID(),
role: 'user',
content: 'Hello',
createdAt: new Date(),
updatedAt: new Date(),
})
const fetchMessagesCallback: MessageRetrievalFn = async (_ctx) => {
return [initialUserMessage]
}
const openai = new OpenAIChatCompletionsAdapter({
apiKey: requireEnv('OPENAI_API_KEY'),
model: process.env.OPENAI_MODEL ?? 'gpt-4o',
autoAck: true,
})
const runner = new TurnRunner({
...noopStorageAdapter,
fetchMessagesCallback,
turnInputPipeline: [hydrateMessages],
executorCallback: openai.executor(),
})
runner.on('message', (chunk) => {
process.stdout.write(chunk.aDelta ?? '')
})
runner.observe('error', (err) => {
console.error('[error]', err.message)
})
runner.observe('turnEnd', ({ turnId }) => {
console.error(`\n[turn ended] ${turnId}`)
})
await runner.run({
turnAbortController: new AbortController(),
systemPrompt: 'You are a helpful assistant.',
standingInstructions: [],
})autoAck: true tells the executor to call ctx.ack() automatically after a tool-call-free response; without it the executor stores the message but leaves turn completion to you, which means this turn would never end unless something else calls ctx.ack().
Run it
Set your OpenAI API key and execute the agent:
OPENAI_API_KEY=sk-... npx tsx src/agent.tsOPENAI_API_KEY=sk-... yarn tsx src/agent.tsOPENAI_API_KEY=sk-... pnpm exec tsx src/agent.tsOPENAI_API_KEY=sk-... bunx tsx src/agent.tsYou should see the assistant response stream to stdout.
RawTurnContext
The TurnRunner takes a RawTurnContext when executing runner.run(rawCtx). It consists of exactly four fields:
| Field | Required | Description |
|---|---|---|
turnAbortController | Yes | An AbortController instance to handle cancellation. |
systemPrompt | Yes | A string or Tokenizable containing system-level behavior guidelines. |
standingInstructions | Yes | An array of string or Tokenizable elements. The TypeScript interface demands this field, even though the underlying runtime schema defaults it to []. Pass [] explicitly in your code to satisfy the compiler. |
stash | No | A Record<string, unknown> to store arbitrary, turn-scoped state metadata. Defaults to {} in raw input and is exposed as ctx.stash: Registry. |
Messages do not go in RawTurnContext
RawTurnContext has zero knowledge of conversation history or user messages. ADK does not automatically fetch or inject messages.
Your turnInputPipeline middleware is responsible for calling ctx.fetchMessages() and loading them into ctx.turnMessages. To pass a user message into the first turn, return it from fetchMessagesCallback and register the hydration middleware.
Why the hydrate middleware is required
ADK keeps message retrieval explicit, so fetchMessagesCallback is only used when your pipeline calls it.
In this assembly:
fetchMessagesCallbackreturns the seededMessage.hydrateMessagescallsctx.fetchMessages().hydrateMessagesloads the returned messages intoctx.turnMessages.- The OpenAI executor reads
ctx.turnMessagesand streams a reply. - The
messagelistener writes eachchunk.aDeltato stdout.
Using a manual executor
If you cannot or do not want to use the battery, see Bring your own LLM for the manual executor path.
What this assembly lacks
This is a structural baseline for one working streamed turn. Before using it as an application foundation, replace or extend:
- Noop Storage — Messages and execution artifacts are not persisted. The seeded user message exists only in this process.
- No Tools — The default configuration has no tools.
- No Retrieval/Memory — There are no hooks loading long-term memory, documents, or vector search results.
- Minimal Error Policy — Errors are observed and printed, but there is no retry, fallback model, timeout policy, or user-facing recovery path.
The upgrade path
Introduce capabilities one at a time:
- Replace the no-op callbacks with a real persistence layer — Bring your own storage
- Customize or replace the model executor — Bring your own LLM
- Equip your agent with capability functions — Bring your own tools
- Wire context injection in
turnInputPipeline— Bring your own retrieval - Enforce business rules and rate limiting via middleware — Wiring the Pipelines
runner.run() returns Promise<void>
A common point of failure is expecting runner.run() to return the assistant response. It does not. All outputs are pushed asynchronously through event emitters, so register listeners before run().
Register listeners before run()
// CORRECT — the listener is ready before streaming starts.
runner.on('message', (chunk) => process.stdout.write(chunk.aDelta ?? ''))
await runner.run(rawCtx)
// WRONG — the turn may finish before this listener exists.
await runner.run(rawCtx)
runner.on('message', (chunk) => process.stdout.write(chunk.aDelta ?? ''))Event reference for this assembly
| Bus | Event | When it fires |
|---|---|---|
Functional (runner.on) | message | Emitted when text chunks are streamed via helpers.reportMessage() |
Functional (runner.on) | thought | Emitted when reasoning steps are streamed via helpers.reportThought() |
Functional (runner.on) | toolCall | Emitted during tool execution transitions via helpers.reportToolCall() |
Observability (runner.observe) | error | Emitted for non-fatal turn pipeline errors and dispatch/executor failures. |
Observability (runner.observe) | turnStart | Emitted when the runner initiates execution |
Observability (runner.observe) | turnEnd | Emitted immediately after a turn finishes, regardless of success or failure |
Observability (runner.observe) | turnGateOpen | Emitted when the turn gate opens |
Observability (runner.observe) | turnGateClosed | Emitted when the turn gate closes |
Observability (runner.observe) | toolExecutionStart | Emitted when tool execution begins |
Observability (runner.observe) | toolExecutionEnd | Emitted when tool execution ends |
Observability (runner.observe) | dispatchStart | Emitted when dispatch begins |
Observability (runner.observe) | dispatchEnd | Emitted when dispatch ends |
Observability (runner.observe) | iterationStart | Emitted when an execution-loop iteration begins |
Observability (runner.observe) | iterationEnd | Emitted when an execution-loop iteration ends |
Observability (runner.observe) | log | Emitted for runner log entries |
For an in-depth exploration of message schemas, error boundaries, and telemetry channels, refer to Listening to the Assembly.