---
url: 'https://adk.nht.io/assembly/pipelines.md'
description: >-
  Wire turnInputPipeline, turnOutputPipeline, dispatchInputPipeline, and
  dispatchOutputPipeline — the four middleware arrays that surround turn and
  dispatch execution.
---

## LLM summary — Wiring the pipelines

* ADK has four middleware pipeline arrays: `turnInputPipeline`, `turnOutputPipeline`, `dispatchInputPipeline`, `dispatchOutputPipeline`.
* All four are optional and default to `[]`.
* **Context Sets start empty.** `ctx.turnMessages`, `ctx.turnMemories`, `ctx.turnRetrievables`, and related turn-scoped Sets are all empty when a turn begins. ADK does not auto-hydrate them. Your `turnInputPipeline` middleware must call `ctx.fetchMessages()`, iterate the results, and call `ctx.turnMessages.add(m)` for each one. If no middleware does this, the executor sees empty turn-scoped context Sets.
* `turnInputPipeline` runs ONCE per turn, BEFORE the dispatch loop. Use for: hydrating context Sets (messages, memories, retrievables), retrieval, rate limit enforcement, stash initialization, standing instruction refresh or stash-derived prompt metadata.
* `turnOutputPipeline` runs ONCE per turn, AFTER the dispatch loop completes SUCCESSFULLY. Use for: memory extraction/write, analytics, webhooks, cleanup. Does NOT run if the turn fails or short-circuits.
* `dispatchInputPipeline` runs BEFORE EACH executor call (each iteration). Use for: iteration cap enforcement, loop detection, corrective instruction injection for repeated tool calls.
* `dispatchOutputPipeline` runs AFTER EACH executor call (each iteration). Use for: per-iteration logging, tool call inspection before the next iteration.
* TurnContext middleware signature: `(ctx: TurnContext, next: NextFn) => void | Promise<void>`; `next` may complete synchronously or asynchronously.
* DispatchContext middleware signature: `(ctx: DispatchContext, next: NextFn) => void | Promise<void>`; `next` may complete synchronously or asynchronously.
* Calling `next()` continues to the next middleware (or the executor). Not calling `next()` short-circuits the pipeline. Turn pipeline short-circuits emit [`E_PIPELINE_SHORT_CIRCUITED`](https://adk.nht.io/api/@nhtio/adk/exceptions/variables/E_PIPELINE_SHORT_CIRCUITED) on the `error` bus and end the turn; dispatch pipeline short-circuits reject dispatch with [`E_PIPELINE_SHORT_CIRCUITED`](https://adk.nht.io/api/@nhtio/adk/exceptions/variables/E_PIPELINE_SHORT_CIRCUITED). Short-circuiting prevents executor execution only in `turnInputPipeline` and `dispatchInputPipeline`.
* HARD RULE: No primary reasoning calls in pipelines. Secondary preprocessing (e.g. query rewriting, classification) is an explicit cost/security exception, not a habit.
* Middleware executes in array order. Later middlewares run after earlier ones call `next()`.
* `ctx.stash` is the designated cross-middleware communication channel. It is a `Registry` instance — use `.get()` and `.set()`, never direct bracket access. Do not pollute the message array with metadata the model doesn't need to see.
* `turnOutputPipeline` does NOT run if the turn fails (executor throws or nacks). Do not put critical cleanup that must run on failure in the output pipeline.
* A minimal iteration cap belongs in `dispatchInputPipeline`: check `ctx.iteration >= MAX`, call `ctx.nack()` if exceeded.

ADK has four middleware pipeline arrays. Pipelines are the sanctioned place to stage context before the executor. Abuse them and your loop becomes opaque.

Dump everything into your executor and you get a bloated monolith where database queries, rate limits, RAG retrieval, and raw reasoning are tangled into one hot knot. That code is hard to debug and harder to test. Pipelines separate preparation from execution.

## The Four Pipelines

### `turnInputPipeline`

Runs **once per turn**, **before** the dispatch loop starts.

Use for:

* **Hydrating context Sets** — loading conversation history, memories, and retrievables into the empty Sets ADK provides (see [Context hydration](#context-hydration) below)
* Running retrieval queries and injecting [`Retrievable`](https://adk.nht.io/api/@nhtio/adk/common/classes/Retrievable) instances
* Enforcing rate limits and access policies — halt here if the user is over quota
* Refreshing standing instructions — call `ctx.refreshStandingInstructions()`, then clear/add entries in `ctx.standingInstructions` as needed — or injecting stash-derived prompt metadata
* Initializing the [`Registry`](https://adk.nht.io/api/@nhtio/adk/common/classes/Registry) in `ctx.stash` with session data

This is where you build the context block. By the time this pipeline exits, the executor's context should contain the material it needs. If you ship an empty input pipeline, the executor starts with empty context sets and will reason about nothing. It will be very confident about that nothing.

### `turnOutputPipeline`

Runs **once per turn**, **after** the dispatch loop completes **successfully**.

::: warning Only runs on success
`turnOutputPipeline` does not run if the turn fails — meaning if input/dispatch fails or short-circuits before the output pipeline. Critical cleanup that must run on failure belongs somewhere else.

Note that the `turnEnd` event does not carry error info directly. If you need failure context, pair `turnEnd` with the `error` observability bus for metrics and failure observation. For guaranteed cleanup, use application-level `try/finally` around `runner.run()` or external request lifecycle hooks.
:::

Use for:

* Writing new memories extracted from the turn
* Sending analytics events or audit logs
* Triggering webhooks on turn completion
* Post-turn state cleanup

### `dispatchInputPipeline`

Runs **before each executor call** — once per iteration of the dispatch loop.

One turn may have many iterations if the model calls tools. This pipeline runs every time. Use it for:

* **Iteration cap enforcement** — check `ctx.iteration >= MAX_ITERATIONS` and call `ctx.nack()` if exceeded
* **Loop detection** — inspect tool call history and intervene if the model is repeating the same failing strategy
* **Corrective instruction injection** — if `ctx.iteration > 5`, append a message telling the model to take a different approach

### `dispatchOutputPipeline`

Runs **after each executor call** — once per iteration.

Use for:

* Per-iteration logging and token accounting
* Inspecting tool calls produced in this iteration before the next one starts
* Structured per-step audit records

## Context Hydration

ADK will not magically fetch your database records. **ADK does not auto-hydrate context Sets.**

When a turn starts, `ctx.turnMessages`, `ctx.turnMemories`, `ctx.turnRetrievables`, and other turn-scoped context Sets are **empty**. ADK allocates them; ADK does not fill them. `ctx.standingInstructions` is different: it starts from the raw turn context. Hydrating turn-scoped Sets is your job, and the only place to do it is `turnInputPipeline`.

If you ship a `turnInputPipeline: []` — or forget to call `ctx.fetchMessages()` inside it — your executor runs with empty turn message/memory/retrieval Sets. The model has no history. It cannot reason about the conversation. It will hallucinate or give nonsensical responses, and none of it will be obvious from the outside.

The fetch-and-add pattern is mandatory:

```ts
// Canonical turnInputPipeline middleware: load conversation history into the
// turn context before the executor sees it.
//
// ADK does NOT auto-call fetchMessagesCallback. Until a middleware like this
// runs, ctx.turnMessages is an empty Set and the executor reasons about
// nothing. Put this first in turnInputPipeline.

import type { TurnPipelineMiddlewareFn } from '@nhtio/adk'

export const hydrateMessages: TurnPipelineMiddlewareFn = async (ctx, next) => {
  const messages = await ctx.fetchMessages()
  for (const m of messages) {
    ctx.turnMessages.add(m)
  }
  await next()
}

```

Call `ctx.fetchMessages()`. Iterate the result. Call `.add()` on each item. Then call `next()`. That is the complete pattern. There is no shortcut.

The same obligation applies to memories. If your agent uses memories, load them in `turnInputPipeline` and `.add()` them into `ctx.turnMemories`. ADK will not do it for you.

Put hydration middleware **first** in the array. Every other middleware that reads `ctx.turnMessages` depends on it running first.

## Middleware Signature

Turn-level middleware uses the [`TurnPipelineMiddlewareFn`](https://adk.nht.io/api/@nhtio/adk/turn_runner/type-aliases/TurnPipelineMiddlewareFn) type and operates on [`TurnContext`](https://adk.nht.io/api/@nhtio/adk/types/interfaces/TurnContext). Its `next` argument is a `NextFn` and may complete synchronously or asynchronously:

```typescript
import type { TurnPipelineMiddlewareFn } from '@nhtio/adk'

const myMiddleware: TurnPipelineMiddlewareFn = async (ctx, next) => {
  // do work before the next step
  await next()
  // do work after the next step returns (if any)
}
```

Dispatch-level middleware uses the [`DispatchPipelineMiddlewareFn`](https://adk.nht.io/api/@nhtio/adk/dispatch_runner/type-aliases/DispatchPipelineMiddlewareFn) type and operates on [`DispatchContext`](https://adk.nht.io/api/@nhtio/adk/types/interfaces/DispatchContext). Its `next` argument is a `NextFn` and may complete synchronously or asynchronously:

```typescript
import type { DispatchPipelineMiddlewareFn } from '@nhtio/adk'

const myDispatchMiddleware: DispatchPipelineMiddlewareFn = async (ctx, next) => {
  await next()
}
```

**You must call `next()`** to continue the pipeline. If you do not call `next()`, the pipeline short-circuits. Turn pipeline short-circuits emit [`E_PIPELINE_SHORT_CIRCUITED`](https://adk.nht.io/api/@nhtio/adk/exceptions/variables/E_PIPELINE_SHORT_CIRCUITED) on the `error` bus and end the turn; dispatch pipeline short-circuits reject dispatch with [`E_PIPELINE_SHORT_CIRCUITED`](https://adk.nht.io/api/@nhtio/adk/exceptions/variables/E_PIPELINE_SHORT_CIRCUITED). Short-circuiting prevents executor execution only in `turnInputPipeline` and `dispatchInputPipeline`; output pipeline short-circuits happen after executor execution. Use this deliberately to kill execution on validation or policy failures.

## The "No Model in Pipelines" Rule

::: danger No primary reasoning in pipelines
All core LLM reasoning belongs inside the executor. Pipelines are for data loading, policy enforcement, and context staging. Do not run primary reasoning loops from inside a pipeline middleware.

Secondary preprocessing—such as query rewriting for vector search, or light classification—is the sole, deliberate exception. Treat this as an expensive cost and latency tradeoff, not a default habit. If you call a model in a pipeline, you pay double latency and double costs. Accept that explicitly.
:::

## Examples

### turnInputPipeline Examples

::: code-group

```typescript [Message Hydration]
<<< @/snippets/hydrate_messages.ts
```

```typescript [Rate Limit]
import type { TurnPipelineMiddlewareFn } from '@nhtio/adk'

const rateLimitMiddleware: TurnPipelineMiddlewareFn = async (ctx, next) => {
  const userId = ctx.stash.get<string>('userId')
  if (!userId) {
    throw new Error('User ID missing from stash')
  }

  const allowed = await rateLimiter.check(userId)
  if (!allowed) {
    throw new Error('Rate limit exceeded')
  }

  await next()
}
```

```typescript [Retrieval Injection]
import type { TurnPipelineMiddlewareFn } from '@nhtio/adk'
import { Retrievable } from '@nhtio/adk'

const retrievalMiddleware: TurnPipelineMiddlewareFn = async (ctx, next) => {
  const lastMessage = [...ctx.turnMessages].at(-1)
  const query = lastMessage?.content?.toString() ?? ''
  const hits = await vectorStore.search(query, { topK: 5 })

  for (const hit of hits) {
    ctx.turnRetrievables.add(new Retrievable({
      id: hit.id,
      content: hit.text,
      trustTier: 'third-party-public',
      createdAt: new Date(),
      updatedAt: new Date(),
    }))
  }

  await next()
}
```

:::

### turnOutputPipeline Examples

::: code-group

```typescript [Memory Write]
import type { TurnPipelineMiddlewareFn } from '@nhtio/adk'
import { Memory } from '@nhtio/adk'

const memoryExtractionMiddleware: TurnPipelineMiddlewareFn = async (ctx, next) => {
  await next()  // let downstream output middleware run

  const facts = await extractFacts([...ctx.turnMessages])
  for (const fact of facts) {
    await ctx.storeMemory(new Memory({ 
      id: crypto.randomUUID(), 
      content: fact,
      confidence: 0.8,
      importance: 0.6,
      createdAt: new Date(),
      updatedAt: new Date(),
    }))
  }
}
```

:::

### dispatchInputPipeline Examples

::: code-group

```typescript [Iteration Cap]
import type { DispatchPipelineMiddlewareFn } from '@nhtio/adk'

const MAX_ITERATIONS = 10

const iterationCapMiddleware: DispatchPipelineMiddlewareFn = async (ctx, next) => {
  if (ctx.iteration >= MAX_ITERATIONS) {
    ctx.nack(new Error(`Max iterations (${MAX_ITERATIONS}) exceeded`))
    return // short-circuit: do not call next()
  }
  await next()
}
```

:::

## Wiring the Pipelines

```typescript
const runner = new TurnRunner({
  ...storageCallbacks,
  executorCallback: myExecutor,

  turnInputPipeline: [
    hydrateMessages,     // FIRST: fill ctx.turnMessages — everything else depends on this
    sessionLoader,       // load session state into ctx.stash
    rateLimitMiddleware, // enforce access policy
    retrievalMiddleware, // inject RAG content
    memoryLoader,        // load memories into context
  ],

  turnOutputPipeline: [
    memoryExtractionMiddleware, // write new memories
    analyticsMiddleware,        // send analytics
  ],

  dispatchInputPipeline: [
    iterationCapMiddleware, // kill runaway loops
  ],

  dispatchOutputPipeline: [
    perIterationLogger, // structured per-step log
  ],
})
```

## Ordering and Composition

Middlewares execute in array order. Later middlewares run after earlier ones call `next()`.

Order is not a preference; it is a structural dependency:

* **Hydration goes first.** Any middleware that reads `ctx.turnMessages` will see an empty Set if hydration hasn't run yet.
* A rate limit middleware must come **before** an expensive retrieval middleware — do not waste database CPU or vector search tokens if the user is over quota.
* A session loader must come **before** any middleware that reads the user's ID from the stash.
* A logging middleware that captures the initial state should be **first** after hydration.

The pipeline is an assembly line. Each station depends on the work of the previous one. Get the order wrong and the failures will be silent — an empty Set looks identical to a legitimately empty conversation from the executor's perspective.

## Cross-Middleware Communication

Use the `ctx.stash` [`Registry`](https://adk.nht.io/api/@nhtio/adk/common/classes/Registry) to pass data between pipeline steps. `ctx.stash` is an instance of a `Registry` on the context that persists for the lifetime of the turn. Do not use direct bracket access on the stash.

```typescript
// Middleware A: loads user profile and stores it in stash
const profileLoader: TurnPipelineMiddlewareFn = async (ctx, next) => {
  const userId = ctx.stash.get<string>('userId')
  if (!userId) throw new Error('User ID missing')
  const profile = await db.getUserProfile(userId)
  ctx.stash.set('userProfile', profile)
  await next()
}

// Middleware B: reads the profile loaded by A
const policyEnforcer: TurnPipelineMiddlewareFn = async (ctx, next) => {
  const profile = ctx.stash.get<UserProfile>('userProfile')
  if (!profile) throw new Error('User profile missing')
  if (!profile.hasAccess) throw new Error('Access denied')
  await next()
}
```

Do not put middleware communication data into the message array. The model reads messages; it does not need to see your session metadata.
