---
url: 'https://adk.nht.io/assembly/events.md'
description: >-
  Wire the functional and observability buses — the two event systems that carry
  output, telemetry, and lifecycle signals out of the runner.
---

# Listening to the Assembly

## LLM summary — Listening to the Assembly

* `runner.run()` returns `Promise<void>`. It resolves when the turn ends. It carries no data. All output leaves through events.
* Two buses. Different registration APIs. Different purpose. Mixing them creates bugs that are invisible until production.
* **Functional bus**: `runner.on(event, listener)`, `runner.off(event, listener)`, `runner.once(event, listener)`. Three events: `message`, `thought`, `toolCall`.
* **Observability bus**: `runner.observe(event, listener)`, `runner.unobserve(event, listener)`, `runner.observeOnce(event, listener)`. Events: `turnStart`, `turnEnd`, `turnGateOpen`, `turnGateClosed`, `toolExecutionStart`, `toolExecutionEnd`, `dispatchStart`, `dispatchEnd`, `iterationStart`, `iterationEnd`, `log`, `error`.
* Separation rule: **If removing the listener changes agent behavior, it belongs on the functional bus. If removing it only affects telemetry, it belongs on the observability bus.** This is not a style convention. It is load-bearing architecture. If your telemetry changes behavior, you built a side-channel bug.
* `message` event fires with [`TurnStreamableContent`](https://adk.nht.io/api/@nhtio/adk/turn_runner/interfaces/TurnStreamableContent): `{ id, createdAt, updatedAt, full, aDelta, isComplete, completedAt? }`. `aDelta` is the incremental chunk; `full` is the accumulated text so far.
* `thought` event fires with [`TurnStreamableContent`](https://adk.nht.io/api/@nhtio/adk/turn_runner/interfaces/TurnStreamableContent). Same shape as `message`. Carries internal reasoning traces — chain-of-thought, extended thinking.
* `toolCall` event fires with [`TurnToolCallContent`](https://adk.nht.io/api/@nhtio/adk/turn_runner/interfaces/TurnToolCallContent): `{ id, tool, args, checksum, createdAt, updatedAt, results?, isComplete, isError, completedAt? }`. Typically fires twice per tool: once when announced (no results), once when complete (results populated).
* Correlate `toolCall` (functional) with `toolExecutionStart`/`toolExecutionEnd` (observability) on `toolCall.checksum === toolExecution*.callId` — both are `sha256({tool,args})`. NOT `toolCall.id`. The hash collides by design for identical `(tool,args)` (that is what `toolCallCount` counts); separate instances by the `DateTime` fields (`createdAt`/`updatedAt`, `startedAt`/`endedAt`).
* `log` observability event carries [`LogEvent`](https://adk.nht.io/api/@nhtio/adk/dispatch_runner/interfaces/LogEvent): `{ dispatchId, iteration, emittedAt, level, kind, message, payload? }`. Fired by `helpers.log.{trace,debug,info,warn,error}()` in the executor.
* `error` observability event fires for pipeline errors and dispatch failures, including executor throw/`ctx.nack()`.
* **Register all listeners before calling `runner.run()`.** Events fire immediately as execution proceeds. A listener registered after `run()` will miss events — including `turnStart` and the beginning of the message stream.
* `runner.on()` is not a one-time-use API. The same listener handles all turns for the lifetime of the runner. Wire once, receive forever.
* Terminal streaming: `runner.on('message', chunk => process.stdout.write(chunk.aDelta ?? ''))`.
* SSE/WebSocket: functional bus feeds the stream to the client. Observability bus feeds your tracing, error reporting, and structured logging stack.
* The practical test: Remove all observability listeners. If the agent still delivers correct responses, your buses are clean. If the agent breaks, you have leaked functional logic into the observability layer.
* `TurnStartEvent`: `{ turnId: string, startedAt: DateTime }`.
* `TurnEndEvent`: `{ turnId: string, startedAt: DateTime, endedAt: DateTime, durationMs: number }`.

`runner.run()` returns `Promise<void>`. It resolves when the turn ends. It carries no data.

This is not an accident. Streaming responses arrive mid-turn. Callers must act on output before the turn finishes. If `run()` returned data, you would wait the entire turn before seeing any of it. The user's screen would stay blank while the agent works. Blank screens are not a streaming strategy.

All meaningful output leaves through events. The runner fires them as execution proceeds. Wire listeners before you call `run()`.

## Two Buses, One Rule

ADK has two event buses. They look similar. They use similar APIs. They are not interchangeable.

**The functional bus** drives application behavior. Output lands here. If you are not listening to the `message` event, the model's response goes nowhere. The functional bus is the product.

**The observability bus** is for telemetry, tracing, metrics, and debugging. It must never affect agent behavior. Remove every observability listener and the agent runs identically. The observability bus is the maintenance layer.

The separation rule:

> **If removing the listener changes agent behavior, it belongs on the functional bus. If removing it only affects telemetry, it belongs on the observability bus.**

This is not a style convention. It is load-bearing architecture. If your telemetry changes behavior, you built a side-channel bug.

::: tip Practical Test
Remove all observability listeners. Run the agent. If the user still gets a correct response, your buses are clean. If the agent stops working, you have leaked functional logic into the telemetry layer.

```ts
// Practical validation test snippet
const runner = new TurnRunner(config)
const outputs: string[] = []

// Wire functional listeners ONLY
runner.on('message', (chunk) => outputs.push(chunk.aDelta ?? ''))

// ZERO runner.observe() calls are registered here

await runner.run(rawCtx)

// Ensure output is still generated successfully
if (outputs.join('').length === 0) {
  throw new Error("Functional logic was broken by omitting observability")
}
```

:::

## The Functional Bus

Register functional listeners via `runner.on()`, remove them via `runner.off()`, and register one-shot listeners via `runner.once()`.

```typescript
import type { TurnRunner } from '@nhtio/adk'

runner.on('message', (chunk) => { /* ... */ })
runner.on('thought', (chunk) => { /* ... */ })
runner.on('toolCall', (call) => { /* ... */ })
```

Three events exist on this bus. These events fire on demand as the corresponding operations occur. They do not magically fire on every turn if no content is generated, but the listeners themselves persist across turns—`runner.on()` is not a one-time-use API. The same listener handles all turns for the lifetime of the runner. Wire once, receive forever.

### `message`

Fires when the model streams a visible assistant message chunk.

```typescript
import type { TurnStreamableContent } from '@nhtio/adk'

runner.on('message', (chunk: TurnStreamableContent) => {
  // chunk.aDelta — the incremental text since the last emission
  // chunk.full   — the accumulated text so far (useful for error recovery)
  // chunk.id     — stable identifier; groups all chunks from one generation
  // chunk.updatedAt — DateTime when this content was last updated
  // chunk.isComplete — true on the final chunk for this id
  // chunk.completedAt — DateTime when the generation completed (optional)
  process.stdout.write(chunk.aDelta ?? '')
})
```

`aDelta` is your streaming token. `full` is the accumulation if you need it. `isComplete` tells you when the generation is done and a new `id` will begin. Each distinct LLM generation within a turn gets its own `id` — multi-turn tool loops produce one `id` per response segment.

If you do not listen to `message`, model output goes nowhere. The agent works; the user sees nothing.

The underlying structure is defined by [`TurnStreamableContent`](https://adk.nht.io/api/@nhtio/adk/turn_runner/interfaces/TurnStreamableContent).

### `thought`

Fires when the model emits internal reasoning — chain-of-thought traces, extended thinking output, scratchpad content.

```typescript
import type { TurnStreamableContent } from '@nhtio/adk'

runner.on('thought', (chunk: TurnStreamableContent) => {
  // Same shape as TurnStreamableContent
  // chunk.aDelta, chunk.full, chunk.id, chunk.updatedAt, chunk.isComplete, chunk.completedAt
})
```

Same shape as `message`. The distinction is semantic: `message` is output the user sees; `thought` is internal reasoning the model produces before committing to an answer. Whether to surface thoughts to the user depends on your product. A debugging console might show both; a production chat interface probably shows only `message`.

If your executor does not call `helpers.reportThought()`, this event never fires. It is optional.

The underlying structure is defined by [`TurnStreamableContent`](https://adk.nht.io/api/@nhtio/adk/turn_runner/interfaces/TurnStreamableContent).

### `toolCall`

Fires when the model requests a tool call, and again when that call completes.

```typescript
import type { TurnToolCallContent } from '@nhtio/adk'

runner.on('toolCall', (call: TurnToolCallContent) => {
  if (!call.isComplete) {
    // Announced: model has requested the tool, execution hasn't started
    console.log(`Tool requested: ${call.tool}`, call.args)
  } else {
    // Complete: tool has executed (or failed)
    if (call.isError) {
      console.error(`Tool ${call.tool} failed`, call.results)
    } else {
      console.log(`Tool ${call.tool} completed`, call.results)
    }
  }
})
```

Typically two emissions per tool call. The first has `isComplete: false` and no `results` — the model announced the tool but the handler hasn't run yet. The second has `isComplete: true` and `results` populated (or `isError: true` if the handler threw). Partial updates are also supported, so do not assume exactly two emissions in every case.

Use this for progress indicators: "Searching the database…" on the first emission, replaced by the result on the second.

`call.checksum` is an integrity fingerprint over `tool` and `args`. If your executor performs validation, you can verify the checksum before calling the handler.

#### Correlating `toolCall` with `toolExecutionStart` / `toolExecutionEnd`

Two buses, one invocation. Join them on the content hash, order them by time.

**Join key: `toolCall.checksum === toolExecution*.callId`.** Both are `sha256({ tool, args })` — the same value under two historical names. `toolCall.id` is the model/stream id, not the cross-bus key; never join on it.

**The hash collides by design.** Identical `(tool, args)` in one turn produce the same `checksum`/`callId` — that is exactly what [`DispatchContext.toolCallCount`](https://adk.nht.io/api/@nhtio/adk/types/interfaces/DispatchContext#toolcallcount) counts. So the hash identifies *what was called*, not *which instance*. To separate instances, use the timestamps: `createdAt` / `updatedAt` on the functional event, `startedAt` / `endedAt` on the execution events. Repeated identical calls are the loop the runner exists to detect, not a join you need to thread perfectly.

**Lifecycle, not 1:1 timing.** Order is `toolCall(isComplete:false)` → `toolExecutionStart` → `toolExecutionEnd` → `toolCall(isComplete:true)`. Partial `toolCall` updates mean you cannot count emissions — key your state map on the hash, stamp it with the timestamp.

The underlying structure is defined by [`TurnToolCallContent`](https://adk.nht.io/api/@nhtio/adk/turn_runner/interfaces/TurnToolCallContent).

## The Observability Bus

Register observability listeners via `runner.observe()`, remove them via `runner.unobserve()`, and register one-shot listeners via `runner.observeOnce()`.

```typescript
runner.observe('turnStart', (event) => { /* ... */ })
runner.observe('turnEnd', (event) => { /* ... */ })
runner.observe('log', (entry) => { /* ... */ })
runner.observe('error', (err) => { /* ... */ })
```

These events must not affect agent behavior. They fire alongside execution; they do not gate it.

### Turn lifecycle events

```typescript
import type { TurnStartEvent, TurnEndEvent } from '@nhtio/adk'

runner.observe('turnStart', (event: TurnStartEvent) => {
  // event.turnId    — stable identifier for this turn
  // event.startedAt — DateTime when the turn began
  spans.set(event.turnId, tracer.startSpan('turn', { startTime: event.startedAt }))
})

runner.observe('turnEnd', (event: TurnEndEvent) => {
  // event.turnId    — matches the corresponding turnStart
  // event.startedAt — DateTime when the turn began
  // event.endedAt   — DateTime when the turn ended
  // event.durationMs — wall-clock duration
  const span = spans.get(event.turnId)
  span?.finish()
})
```

`turnStart` fires immediately before the input pipeline runs. `turnEnd` fires after the pipeline completes — whether successfully, in error, or via abort. `turnEnd` always fires if `turnStart` fired.

The lifecycle structures are defined by [`TurnStartEvent`](https://adk.nht.io/api/@nhtio/adk/turn_runner/interfaces/TurnStartEvent) and [`TurnEndEvent`](https://adk.nht.io/api/@nhtio/adk/turn_runner/interfaces/TurnEndEvent).

### Dispatch and iteration events

```typescript
runner.observe('dispatchStart', (event: DispatchStartEvent) => { /* fires once per dispatch */ })
runner.observe('dispatchEnd', (event: DispatchEndEvent) => { /* fires once per dispatch */ })
runner.observe('iterationStart', (event: IterationStartEvent) => { /* fires per LLM call */ })
runner.observe('iterationEnd', (event: IterationEndEvent) => { /* fires per LLM call */ })
```

A single turn runs one dispatch. A dispatch runs one or more iterations — one per LLM call, one per tool loop cycle. Use iteration events to track iteration timing; use `log` and tool execution events for token counts and tool details.

These structures are defined by:

* [`DispatchStartEvent`](https://adk.nht.io/api/@nhtio/adk/dispatch_runner/interfaces/DispatchStartEvent)
* [`DispatchEndEvent`](https://adk.nht.io/api/@nhtio/adk/dispatch_runner/interfaces/DispatchEndEvent)
* [`IterationStartEvent`](https://adk.nht.io/api/@nhtio/adk/dispatch_runner/interfaces/IterationStartEvent)
* [`IterationEndEvent`](https://adk.nht.io/api/@nhtio/adk/dispatch_runner/interfaces/IterationEndEvent)

### Tool execution events

These fire inside tool handler execution — after argument validation passes, before the handler result is returned to the executor. Use them to measure tool latency, flag slow handlers, or build per-tool performance baselines.

```typescript
import type { ToolExecutionStartEvent, ToolExecutionEndEvent } from '@nhtio/adk'

runner.observe('toolExecutionStart', (event: ToolExecutionStartEvent) => {
  // event.callId    — sha256({ tool, args }); equals toolCall.checksum. The cross-bus join key, NOT toolCall.id
  // event.toolName  — name of the tool being executed
  // event.args      — the validated arguments passed to the handler
  // event.turnId    — stable identifier for this turn
  // event.startedAt — DateTime when tool execution started
})

runner.observe('toolExecutionEnd', (event: ToolExecutionEndEvent) => {
  // event.callId     — sha256({ tool, args }); equals toolCall.checksum. The cross-bus join key, NOT toolCall.id
  // event.toolName   — name of the tool executed
  // event.turnId     — stable identifier for this turn
  // event.startedAt  — DateTime when tool execution started
  // event.endedAt    — DateTime when tool execution ended
  // event.durationMs — handler wall time
  // event.isError    — true when the handler threw
})
```

These structures are defined by [`ToolExecutionStartEvent`](https://adk.nht.io/api/@nhtio/adk/turn_runner/interfaces/ToolExecutionStartEvent) and [`ToolExecutionEndEvent`](https://adk.nht.io/api/@nhtio/adk/turn_runner/interfaces/ToolExecutionEndEvent).

### `log`

Fires when the executor calls `helpers.log.{trace|debug|info|warn|error}()`.

```typescript
import type { LogEvent } from '@nhtio/adk'

runner.observe('log', (entry: LogEvent) => {
  // entry.dispatchId — which dispatch this came from
  // entry.iteration  — which iteration within the dispatch
  // entry.emittedAt  — DateTime
  // entry.level      — 'trace' | 'debug' | 'info' | 'warn' | 'error'
  // entry.kind       — stable discriminator string authored by the executor
  // entry.message    — human-readable message
  // entry.payload    — optional structured detail

  myLogger.log(entry.level, entry.message, entry.payload)
})
```

`log` is the executor's structured logging channel. Use `kind` as a stable discriminator for filtering in your log aggregator. `payload` is the structured detail block — use it for token counts, provider latency, tool names, and other per-iteration metrics.

The structure is defined by [`LogEvent`](https://adk.nht.io/api/@nhtio/adk/dispatch_runner/interfaces/LogEvent).

### `error`

Fires when an error occurs inside an input or output pipeline, or when a dispatch fails (such as when the executor throws or calls `ctx.nack()`).

```typescript
runner.observe('error', (err) => {
  errorReporter.captureException(err)
})
```

Unlike passive systems, this fires for pipeline-level errors AND dispatch-level failures (such as executor nack/throw). If the dispatch fails, it is caught, surfaced to the `error` bus, and terminates the turn. `ctx.nack()` is surfaced on the `error` bus; `TurnEndEvent` only carries timing fields.

### Gate events

```typescript
runner.observe('turnGateOpen', (event: TurnGate) => { /* ctx.waitFor() called — a gate was opened */ })
runner.observe('turnGateClosed', (event: TurnGateClosedEvent) => { /* gate settled — resolved, rejected, timed out, or aborted */ })
```

Gate events track `ctx.waitFor()` usage — human-in-the-loop patterns where the turn suspends waiting for external resolution. You do not need to wire these unless your assembly uses `waitFor`. They are useful for tracing the gate lifecycle in long-running approval workflows.

The structures are defined by [`TurnGate`](https://adk.nht.io/api/@nhtio/adk/common/interfaces/TurnGate) and [`TurnGateClosedEvent`](https://adk.nht.io/api/@nhtio/adk/turn_runner/interfaces/TurnGateClosedEvent).

## Timing: Register Before You Run

::: danger Register listeners before run()
Calling `runner.run()` before registering listeners loses events. The message stream begins the microsecond the executor fires. If you are not already listening, those initial chunks are gone forever.

Register all listeners *before* invoking `runner.run()`.
:::

::: code-group

```ts [WRONG]
// Too late — execution has already started, you will miss events
const promise = runner.run(rawCtx)
runner.on('message', handler)  
await promise
```

```ts [RIGHT]
// Wire first, then trigger the engine
runner.on('message', handler)
runner.observe('turnStart', observer)
await runner.run(rawCtx)
```

:::

## Concrete Wiring Patterns

### Terminal streaming

The simplest functional wiring writes streaming chunks directly to stdout:

```typescript
runner.on('message', (chunk) => {
  process.stdout.write(chunk.aDelta ?? '')
})

runner.on('toolCall', (call) => {
  if (!call.isComplete) {
    process.stderr.write(`\n[tool: ${call.tool}]`)
  }
})

await runner.run({
  turnAbortController: new AbortController(),
  systemPrompt: 'You are a helpful assistant.',
  standingInstructions: [],
})

process.stdout.write('\n')
```

The functional bus carries the product. The observability bus stays empty in this context — no metrics, no spans, no logs. That is valid for a CLI tool.

### SSE endpoint

A server-sent events endpoint feeds the functional bus directly to the client. The shape below uses a generic Node-style request/response pattern — adapt this to your HTTP framework of choice:

```typescript
async function handleChatRequest(req: any, res: any) {
  const onMessage = (chunk: TurnStreamableContent) => {
    res.write(`data: ${JSON.stringify({ type: 'message', ...chunk })}\n\n`)
  }

  const onToolCall = (call: TurnToolCallContent) => {
    res.write(`data: ${JSON.stringify({ type: 'toolCall', ...call })}\n\n`)
  }

  runner.on('message', onMessage)
  runner.on('toolCall', onToolCall)

  try {
    await runner.run(buildRawContext(req))
    res.write('data: {"type":"done"}\n\n')
  } finally {
    runner.off('message', onMessage)
    runner.off('toolCall', onToolCall)
    res.end()
  }
}
```

Register the listeners before `run()`. Remove them in a `finally` block — especially on an HTTP server where the runner persists across requests and listeners accumulate if not cleaned up.

### Distributed tracing

The observability bus maps naturally to distributed tracing spans. Wire `turnStart` and `turnEnd` to your tracer:

```typescript
const activeSpans = new Map<string, { end: (attrs: Record<string, unknown>) => void }>()

runner.observe('turnStart', (event) => {
  activeSpans.set(event.turnId, tracer.startSpan('adk.turn', {
    startTime: event.startedAt.toMillis(),
    attributes: { 'adk.turnId': event.turnId },
  }))
})

runner.observe('turnEnd', (event) => {
  const span = activeSpans.get(event.turnId)
  if (span) {
    span.end({ 'adk.durationMs': event.durationMs })
    activeSpans.delete(event.turnId)
  }
})

runner.observe('log', (entry) => {
  if (entry.level === 'error' || entry.level === 'warn') {
    console.error(`[${entry.level}] ${entry.kind}: ${entry.message}`, entry.payload)
  }
})

runner.observe('error', (err) => {
  errorReporter.captureException(err)
})
```

The observability bus feeds the telemetry layer. The functional bus feeds the user. Neither crosses the other.

### Structured per-turn logging

Wire the observability bus to your structured logger:

```typescript
runner.observe('turnStart', ({ turnId }) => {
  logger.info({ turnId }, 'turn started')
})

runner.observe('turnEnd', ({ turnId, durationMs }) => {
  logger.info({ turnId, durationMs }, 'turn ended')
})

runner.observe('log', (entry) => {
  logger[entry.level]({
    dispatchId: entry.dispatchId,
    iteration: entry.iteration,
    kind: entry.kind,
    payload: entry.payload,
  }, entry.message)
})
```

The executor's structured log entries (`helpers.log.info(...)`) arrive here. Use `kind` to filter by event type in your log aggregator. `payload` carries the structured metrics your executor produces per iteration.

## The Buses Are Not Symmetric

The functional bus and the observability bus look similar but behave differently in one critical respect: **functional listeners are part of the delivery contract**.

A `message` listener that is removed breaks the agent for the user. There is no fallback. The functional bus is the only path streaming output takes out of the runner. If the listener is not there, the output goes nowhere.

An `error` observability listener that is removed means you stop seeing errors in your monitoring dashboard. The agent continues running. Users are unaffected. You just have less visibility.

This asymmetry is intentional. It means you can safely add, remove, and modify observability instrumentation without risking the agent. It means functional wiring must be treated with the same care as the executor — it is part of the production contract.

Treat functional listeners as infrastructure. Treat observability listeners as telemetry.
