Listening to the Assembly
runner.run() returns Promise<void>. It resolves when the turn ends. It carries no data.
This is not an accident. Streaming responses arrive mid-turn. Callers must act on output before the turn finishes. If run() returned data, you would wait the entire turn before seeing any of it. The user's screen would stay blank while the agent works. Blank screens are not a streaming strategy.
All meaningful output leaves through events. The runner fires them as execution proceeds. Wire listeners before you call run().
Two Buses, One Rule
ADK has two event buses. They look similar. They use similar APIs. They are not interchangeable.
The functional bus drives application behavior. Output lands here. If you are not listening to the message event, the model's response goes nowhere. The functional bus is the product.
The observability bus is for telemetry, tracing, metrics, and debugging. It must never affect agent behavior. Remove every observability listener and the agent runs identically. The observability bus is the maintenance layer.
The separation rule:
If removing the listener changes agent behavior, it belongs on the functional bus. If removing it only affects telemetry, it belongs on the observability bus.
This is not a style convention. It is load-bearing architecture. If your telemetry changes behavior, you built a side-channel bug.
Practical Test
Remove all observability listeners. Run the agent. If the user still gets a correct response, your buses are clean. If the agent stops working, you have leaked functional logic into the telemetry layer.
// Practical validation test snippet
const runner = new TurnRunner(config)
const outputs: string[] = []
// Wire functional listeners ONLY
runner.on('message', (chunk) => outputs.push(chunk.aDelta ?? ''))
// ZERO runner.observe() calls are registered here
await runner.run(rawCtx)
// Ensure output is still generated successfully
if (outputs.join('').length === 0) {
throw new Error("Functional logic was broken by omitting observability")
}The Functional Bus
Register functional listeners via runner.on(), remove them via runner.off(), and register one-shot listeners via runner.once().
import type { TurnRunner } from '@nhtio/adk'
runner.on('message', (chunk) => { /* ... */ })
runner.on('thought', (chunk) => { /* ... */ })
runner.on('toolCall', (call) => { /* ... */ })Three events exist on this bus. These events fire on demand as the corresponding operations occur. They do not magically fire on every turn if no content is generated, but the listeners themselves persist across turns—runner.on() is not a one-time-use API. The same listener handles all turns for the lifetime of the runner. Wire once, receive forever.
message
Fires when the model streams a visible assistant message chunk.
import type { TurnStreamableContent } from '@nhtio/adk'
runner.on('message', (chunk: TurnStreamableContent) => {
// chunk.aDelta — the incremental text since the last emission
// chunk.full — the accumulated text so far (useful for error recovery)
// chunk.id — stable identifier; groups all chunks from one generation
// chunk.updatedAt — DateTime when this content was last updated
// chunk.isComplete — true on the final chunk for this id
// chunk.completedAt — DateTime when the generation completed (optional)
process.stdout.write(chunk.aDelta ?? '')
})aDelta is your streaming token. full is the accumulation if you need it. isComplete tells you when the generation is done and a new id will begin. Each distinct LLM generation within a turn gets its own id — multi-turn tool loops produce one id per response segment.
If you do not listen to message, model output goes nowhere. The agent works; the user sees nothing.
The underlying structure is defined by TurnStreamableContent.
thought
Fires when the model emits internal reasoning — chain-of-thought traces, extended thinking output, scratchpad content.
import type { TurnStreamableContent } from '@nhtio/adk'
runner.on('thought', (chunk: TurnStreamableContent) => {
// Same shape as TurnStreamableContent
// chunk.aDelta, chunk.full, chunk.id, chunk.updatedAt, chunk.isComplete, chunk.completedAt
})Same shape as message. The distinction is semantic: message is output the user sees; thought is internal reasoning the model produces before committing to an answer. Whether to surface thoughts to the user depends on your product. A debugging console might show both; a production chat interface probably shows only message.
If your executor does not call helpers.reportThought(), this event never fires. It is optional.
The underlying structure is defined by TurnStreamableContent.
toolCall
Fires when the model requests a tool call, and again when that call completes.
import type { TurnToolCallContent } from '@nhtio/adk'
runner.on('toolCall', (call: TurnToolCallContent) => {
if (!call.isComplete) {
// Announced: model has requested the tool, execution hasn't started
console.log(`Tool requested: ${call.tool}`, call.args)
} else {
// Complete: tool has executed (or failed)
if (call.isError) {
console.error(`Tool ${call.tool} failed`, call.results)
} else {
console.log(`Tool ${call.tool} completed`, call.results)
}
}
})Typically two emissions per tool call. The first has isComplete: false and no results — the model announced the tool but the handler hasn't run yet. The second has isComplete: true and results populated (or isError: true if the handler threw). Partial updates are also supported, so do not assume exactly two emissions in every case.
Use this for progress indicators: "Searching the database…" on the first emission, replaced by the result on the second.
call.checksum is an integrity fingerprint over tool and args. If your executor performs validation, you can verify the checksum before calling the handler.
The underlying structure is defined by TurnToolCallContent.
The Observability Bus
Register observability listeners via runner.observe(), remove them via runner.unobserve(), and register one-shot listeners via runner.observeOnce().
runner.observe('turnStart', (event) => { /* ... */ })
runner.observe('turnEnd', (event) => { /* ... */ })
runner.observe('log', (entry) => { /* ... */ })
runner.observe('error', (err) => { /* ... */ })These events must not affect agent behavior. They fire alongside execution; they do not gate it.
Turn lifecycle events
import type { TurnStartEvent, TurnEndEvent } from '@nhtio/adk'
runner.observe('turnStart', (event: TurnStartEvent) => {
// event.turnId — stable identifier for this turn
// event.startedAt — DateTime when the turn began
spans.set(event.turnId, tracer.startSpan('turn', { startTime: event.startedAt }))
})
runner.observe('turnEnd', (event: TurnEndEvent) => {
// event.turnId — matches the corresponding turnStart
// event.startedAt — DateTime when the turn began
// event.endedAt — DateTime when the turn ended
// event.durationMs — wall-clock duration
const span = spans.get(event.turnId)
span?.finish()
})turnStart fires immediately before the input pipeline runs. turnEnd fires after the pipeline completes — whether successfully, in error, or via abort. turnEnd always fires if turnStart fired.
The lifecycle structures are defined by TurnStartEvent and TurnEndEvent.
Dispatch and iteration events
runner.observe('dispatchStart', (event: DispatchStartEvent) => { /* fires once per dispatch */ })
runner.observe('dispatchEnd', (event: DispatchEndEvent) => { /* fires once per dispatch */ })
runner.observe('iterationStart', (event: IterationStartEvent) => { /* fires per LLM call */ })
runner.observe('iterationEnd', (event: IterationEndEvent) => { /* fires per LLM call */ })A single turn runs one dispatch. A dispatch runs one or more iterations — one per LLM call, one per tool loop cycle. Use iteration events to track iteration timing; use log and tool execution events for token counts and tool details.
These structures are defined by:
Tool execution events
These fire inside tool handler execution — after argument validation passes, before the handler result is returned to the executor. Use them to measure tool latency, flag slow handlers, or build per-tool performance baselines.
import type { ToolExecutionStartEvent, ToolExecutionEndEvent } from '@nhtio/adk'
runner.observe('toolExecutionStart', (event: ToolExecutionStartEvent) => {
// event.callId — correlates with the tool execution call id/checksum, not necessarily the functional toolCall event id
// event.toolName — name of the tool being executed
// event.args — the validated arguments passed to the handler
// event.turnId — stable identifier for this turn
// event.startedAt — DateTime when tool execution started
})
runner.observe('toolExecutionEnd', (event: ToolExecutionEndEvent) => {
// event.callId — the tool execution call id computed from tool name and raw args, unless your executor deliberately uses the same id
// event.toolName — name of the tool executed
// event.turnId — stable identifier for this turn
// event.startedAt — DateTime when tool execution started
// event.endedAt — DateTime when tool execution ended
// event.durationMs — handler wall time
// event.isError — true when the handler threw
})These structures are defined by ToolExecutionStartEvent and ToolExecutionEndEvent.
log
Fires when the executor calls helpers.log.{trace|debug|info|warn|error}().
import type { LogEvent } from '@nhtio/adk'
runner.observe('log', (entry: LogEvent) => {
// entry.dispatchId — which dispatch this came from
// entry.iteration — which iteration within the dispatch
// entry.emittedAt — DateTime
// entry.level — 'trace' | 'debug' | 'info' | 'warn' | 'error'
// entry.kind — stable discriminator string authored by the executor
// entry.message — human-readable message
// entry.payload — optional structured detail
myLogger.log(entry.level, entry.message, entry.payload)
})log is the executor's structured logging channel. Use kind as a stable discriminator for filtering in your log aggregator. payload is the structured detail block — use it for token counts, provider latency, tool names, and other per-iteration metrics.
The structure is defined by LogEvent.
error
Fires when an error occurs inside an input or output pipeline, or when a dispatch fails (such as when the executor throws or calls ctx.nack()).
runner.observe('error', (err) => {
errorReporter.captureException(err)
})Unlike passive systems, this fires for pipeline-level errors AND dispatch-level failures (such as executor nack/throw). If the dispatch fails, it is caught, surfaced to the error bus, and terminates the turn. ctx.nack() is surfaced on the error bus; TurnEndEvent only carries timing fields.
Gate events
runner.observe('turnGateOpen', (event: TurnGate) => { /* ctx.waitFor() called — a gate was opened */ })
runner.observe('turnGateClosed', (event: TurnGateClosedEvent) => { /* gate settled — resolved, rejected, timed out, or aborted */ })Gate events track ctx.waitFor() usage — human-in-the-loop patterns where the turn suspends waiting for external resolution. You do not need to wire these unless your assembly uses waitFor. They are useful for tracing the gate lifecycle in long-running approval workflows.
The structures are defined by TurnGate and TurnGateClosedEvent.
Timing: Register Before You Run
Register listeners before run()
Calling runner.run() before registering listeners loses events. The message stream begins the microsecond the executor fires. If you are not already listening, those initial chunks are gone forever.
Register all listeners before invoking runner.run().
// Too late — execution has already started, you will miss events
const promise = runner.run(rawCtx)
runner.on('message', handler)
await promise// Wire first, then trigger the engine
runner.on('message', handler)
runner.observe('turnStart', observer)
await runner.run(rawCtx)Concrete Wiring Patterns
Terminal streaming
The simplest functional wiring writes streaming chunks directly to stdout:
runner.on('message', (chunk) => {
process.stdout.write(chunk.aDelta ?? '')
})
runner.on('toolCall', (call) => {
if (!call.isComplete) {
process.stderr.write(`\n[tool: ${call.tool}]`)
}
})
await runner.run({
turnAbortController: new AbortController(),
systemPrompt: 'You are a helpful assistant.',
standingInstructions: [],
})
process.stdout.write('\n')The functional bus carries the product. The observability bus stays empty in this context — no metrics, no spans, no logs. That is valid for a CLI tool.
SSE endpoint
A server-sent events endpoint feeds the functional bus directly to the client. The shape below uses a generic Node-style request/response pattern — adapt this to your HTTP framework of choice:
async function handleChatRequest(req: any, res: any) {
const onMessage = (chunk: TurnStreamableContent) => {
res.write(`data: ${JSON.stringify({ type: 'message', ...chunk })}\n\n`)
}
const onToolCall = (call: TurnToolCallContent) => {
res.write(`data: ${JSON.stringify({ type: 'toolCall', ...call })}\n\n`)
}
runner.on('message', onMessage)
runner.on('toolCall', onToolCall)
try {
await runner.run(buildRawContext(req))
res.write('data: {"type":"done"}\n\n')
} finally {
runner.off('message', onMessage)
runner.off('toolCall', onToolCall)
res.end()
}
}Register the listeners before run(). Remove them in a finally block — especially on an HTTP server where the runner persists across requests and listeners accumulate if not cleaned up.
Distributed tracing
The observability bus maps naturally to distributed tracing spans. Wire turnStart and turnEnd to your tracer:
const activeSpans = new Map<string, { end: (attrs: Record<string, unknown>) => void }>()
runner.observe('turnStart', (event) => {
activeSpans.set(event.turnId, tracer.startSpan('adk.turn', {
startTime: event.startedAt.toMillis(),
attributes: { 'adk.turnId': event.turnId },
}))
})
runner.observe('turnEnd', (event) => {
const span = activeSpans.get(event.turnId)
if (span) {
span.end({ 'adk.durationMs': event.durationMs })
activeSpans.delete(event.turnId)
}
})
runner.observe('log', (entry) => {
if (entry.level === 'error' || entry.level === 'warn') {
console.error(`[${entry.level}] ${entry.kind}: ${entry.message}`, entry.payload)
}
})
runner.observe('error', (err) => {
errorReporter.captureException(err)
})The observability bus feeds the telemetry layer. The functional bus feeds the user. Neither crosses the other.
Structured per-turn logging
Wire the observability bus to your structured logger:
runner.observe('turnStart', ({ turnId }) => {
logger.info({ turnId }, 'turn started')
})
runner.observe('turnEnd', ({ turnId, durationMs }) => {
logger.info({ turnId, durationMs }, 'turn ended')
})
runner.observe('log', (entry) => {
logger[entry.level]({
dispatchId: entry.dispatchId,
iteration: entry.iteration,
kind: entry.kind,
payload: entry.payload,
}, entry.message)
})The executor's structured log entries (helpers.log.info(...)) arrive here. Use kind to filter by event type in your log aggregator. payload carries the structured metrics your executor produces per iteration.
The Buses Are Not Symmetric
The functional bus and the observability bus look similar but behave differently in one critical respect: functional listeners are part of the delivery contract.
A message listener that is removed breaks the agent for the user. There is no fallback. The functional bus is the only path streaming output takes out of the runner. If the listener is not there, the output goes nowhere.
An error observability listener that is removed means you stop seeing errors in your monitoring dashboard. The agent continues running. Users are unaffected. You just have less visibility.
This asymmetry is intentional. It means you can safely add, remove, and modify observability instrumentation without risking the agent. It means functional wiring must be treated with the same care as the executor — it is part of the production contract.
Treat functional listeners as infrastructure. Treat observability listeners as telemetry.