ADK has four middleware pipeline arrays. Pipelines are the sanctioned place to stage context before the executor. Abuse them and your loop becomes opaque.
Dump everything into your executor and you get a bloated monolith where database queries, rate limits, RAG retrieval, and raw reasoning are tangled into one hot knot. That code is hard to debug and harder to test. Pipelines separate preparation from execution.
The Four Pipelines
turnInputPipeline
Runs once per turn, before the dispatch loop starts.
Use for:
- Hydrating context Sets — loading conversation history, memories, and retrievables into the empty Sets ADK provides (see Context hydration below)
- Running retrieval queries and injecting
Retrievableinstances - Enforcing rate limits and access policies — halt here if the user is over quota
- Refreshing standing instructions — call
ctx.refreshStandingInstructions(), then clear/add entries inctx.standingInstructionsas needed — or injecting stash-derived prompt metadata - Initializing the
Registryinctx.stashwith session data
This is where you build the context block. By the time this pipeline exits, the executor's context should contain the material it needs. If you ship an empty input pipeline, the executor starts with empty context sets and will reason about nothing. It will be very confident about that nothing.
turnOutputPipeline
Runs once per turn, after the dispatch loop completes successfully.
Only runs on success
turnOutputPipeline does not run if the turn fails — meaning if input/dispatch fails or short-circuits before the output pipeline. Critical cleanup that must run on failure belongs somewhere else.
Note that the turnEnd event does not carry error info directly. If you need failure context, pair turnEnd with the error observability bus for metrics and failure observation. For guaranteed cleanup, use application-level try/finally around runner.run() or external request lifecycle hooks.
Use for:
- Writing new memories extracted from the turn
- Sending analytics events or audit logs
- Triggering webhooks on turn completion
- Post-turn state cleanup
dispatchInputPipeline
Runs before each executor call — once per iteration of the dispatch loop.
One turn may have many iterations if the model calls tools. This pipeline runs every time. Use it for:
- Iteration cap enforcement — check
ctx.iteration >= MAX_ITERATIONSand callctx.nack()if exceeded - Loop detection — inspect tool call history and intervene if the model is repeating the same failing strategy
- Corrective instruction injection — if
ctx.iteration > 5, append a message telling the model to take a different approach
dispatchOutputPipeline
Runs after each executor call — once per iteration.
Use for:
- Per-iteration logging and token accounting
- Inspecting tool calls produced in this iteration before the next one starts
- Structured per-step audit records
Context Hydration
ADK will not magically fetch your database records. ADK does not auto-hydrate context Sets.
When a turn starts, ctx.turnMessages, ctx.turnMemories, ctx.turnRetrievables, and other turn-scoped context Sets are empty. ADK allocates them; ADK does not fill them. ctx.standingInstructions is different: it starts from the raw turn context. Hydrating turn-scoped Sets is your job, and the only place to do it is turnInputPipeline.
If you ship a turnInputPipeline: [] — or forget to call ctx.fetchMessages() inside it — your executor runs with empty turn message/memory/retrieval Sets. The model has no history. It cannot reason about the conversation. It will hallucinate or give nonsensical responses, and none of it will be obvious from the outside.
The fetch-and-add pattern is mandatory:
// Canonical turnInputPipeline middleware: load conversation history into the
// turn context before the executor sees it.
//
// ADK does NOT auto-call fetchMessagesCallback. Until a middleware like this
// runs, ctx.turnMessages is an empty Set and the executor reasons about
// nothing. Put this first in turnInputPipeline.
import type { TurnPipelineMiddlewareFn } from '@nhtio/adk'
export const hydrateMessages: TurnPipelineMiddlewareFn = async (ctx, next) => {
const messages = await ctx.fetchMessages()
for (const m of messages) {
ctx.turnMessages.add(m)
}
await next()
}Call ctx.fetchMessages(). Iterate the result. Call .add() on each item. Then call next(). That is the complete pattern. There is no shortcut.
The same obligation applies to memories. If your agent uses memories, load them in turnInputPipeline and .add() them into ctx.turnMemories. ADK will not do it for you.
Put hydration middleware first in the array. Every other middleware that reads ctx.turnMessages depends on it running first.
Middleware Signature
Turn-level middleware uses the TurnPipelineMiddlewareFn type and operates on TurnContext. Its next argument is a NextFn and may complete synchronously or asynchronously:
import type { TurnPipelineMiddlewareFn } from '@nhtio/adk'
const myMiddleware: TurnPipelineMiddlewareFn = async (ctx, next) => {
// do work before the next step
await next()
// do work after the next step returns (if any)
}Dispatch-level middleware uses the DispatchPipelineMiddlewareFn type and operates on DispatchContext. Its next argument is a NextFn and may complete synchronously or asynchronously:
import type { DispatchPipelineMiddlewareFn } from '@nhtio/adk'
const myDispatchMiddleware: DispatchPipelineMiddlewareFn = async (ctx, next) => {
await next()
}You must call next() to continue the pipeline. If you do not call next(), the pipeline short-circuits. Turn pipeline short-circuits emit E_PIPELINE_SHORT_CIRCUITED on the error bus and end the turn; dispatch pipeline short-circuits reject dispatch with E_PIPELINE_SHORT_CIRCUITED. Short-circuiting prevents executor execution only in turnInputPipeline and dispatchInputPipeline; output pipeline short-circuits happen after executor execution. Use this deliberately to kill execution on validation or policy failures.
The "No Model in Pipelines" Rule
No primary reasoning in pipelines
All core LLM reasoning belongs inside the executor. Pipelines are for data loading, policy enforcement, and context staging. Do not run primary reasoning loops from inside a pipeline middleware.
Secondary preprocessing—such as query rewriting for vector search, or light classification—is the sole, deliberate exception. Treat this as an expensive cost and latency tradeoff, not a default habit. If you call a model in a pipeline, you pay double latency and double costs. Accept that explicitly.
Examples
turnInputPipeline Examples
<<< @/snippets/hydrate_messages.tsimport type { TurnPipelineMiddlewareFn } from '@nhtio/adk'
const rateLimitMiddleware: TurnPipelineMiddlewareFn = async (ctx, next) => {
const userId = ctx.stash.get<string>('userId')
if (!userId) {
throw new Error('User ID missing from stash')
}
const allowed = await rateLimiter.check(userId)
if (!allowed) {
throw new Error('Rate limit exceeded')
}
await next()
}import type { TurnPipelineMiddlewareFn } from '@nhtio/adk'
import { Retrievable } from '@nhtio/adk'
const retrievalMiddleware: TurnPipelineMiddlewareFn = async (ctx, next) => {
const lastMessage = [...ctx.turnMessages].at(-1)
const query = lastMessage?.content?.toString() ?? ''
const hits = await vectorStore.search(query, { topK: 5 })
for (const hit of hits) {
ctx.turnRetrievables.add(new Retrievable({
id: hit.id,
content: hit.text,
trustTier: 'third-party-public',
createdAt: new Date(),
updatedAt: new Date(),
}))
}
await next()
}turnOutputPipeline Examples
import type { TurnPipelineMiddlewareFn } from '@nhtio/adk'
import { Memory } from '@nhtio/adk'
const memoryExtractionMiddleware: TurnPipelineMiddlewareFn = async (ctx, next) => {
await next() // let downstream output middleware run
const facts = await extractFacts([...ctx.turnMessages])
for (const fact of facts) {
await ctx.storeMemory(new Memory({
id: crypto.randomUUID(),
content: fact,
confidence: 0.8,
importance: 0.6,
createdAt: new Date(),
updatedAt: new Date(),
}))
}
}dispatchInputPipeline Examples
import type { DispatchPipelineMiddlewareFn } from '@nhtio/adk'
const MAX_ITERATIONS = 10
const iterationCapMiddleware: DispatchPipelineMiddlewareFn = async (ctx, next) => {
if (ctx.iteration >= MAX_ITERATIONS) {
ctx.nack(new Error(`Max iterations (${MAX_ITERATIONS}) exceeded`))
return // short-circuit: do not call next()
}
await next()
}Wiring the Pipelines
const runner = new TurnRunner({
...storageCallbacks,
executorCallback: myExecutor,
turnInputPipeline: [
hydrateMessages, // FIRST: fill ctx.turnMessages — everything else depends on this
sessionLoader, // load session state into ctx.stash
rateLimitMiddleware, // enforce access policy
retrievalMiddleware, // inject RAG content
memoryLoader, // load memories into context
],
turnOutputPipeline: [
memoryExtractionMiddleware, // write new memories
analyticsMiddleware, // send analytics
],
dispatchInputPipeline: [
iterationCapMiddleware, // kill runaway loops
],
dispatchOutputPipeline: [
perIterationLogger, // structured per-step log
],
})Ordering and Composition
Middlewares execute in array order. Later middlewares run after earlier ones call next().
Order is not a preference; it is a structural dependency:
- Hydration goes first. Any middleware that reads
ctx.turnMessageswill see an empty Set if hydration hasn't run yet. - A rate limit middleware must come before an expensive retrieval middleware — do not waste database CPU or vector search tokens if the user is over quota.
- A session loader must come before any middleware that reads the user's ID from the stash.
- A logging middleware that captures the initial state should be first after hydration.
The pipeline is an assembly line. Each station depends on the work of the previous one. Get the order wrong and the failures will be silent — an empty Set looks identical to a legitimately empty conversation from the executor's perspective.
Cross-Middleware Communication
Use the ctx.stash Registry to pass data between pipeline steps. ctx.stash is an instance of a Registry on the context that persists for the lifetime of the turn. Do not use direct bracket access on the stash.
// Middleware A: loads user profile and stores it in stash
const profileLoader: TurnPipelineMiddlewareFn = async (ctx, next) => {
const userId = ctx.stash.get<string>('userId')
if (!userId) throw new Error('User ID missing')
const profile = await db.getUserProfile(userId)
ctx.stash.set('userProfile', profile)
await next()
}
// Middleware B: reads the profile loaded by A
const policyEnforcer: TurnPipelineMiddlewareFn = async (ctx, next) => {
const profile = ctx.stash.get<UserProfile>('userProfile')
if (!profile) throw new Error('User profile missing')
if (!profile.hasAccess) throw new Error('Access denied')
await next()
}Do not put middleware communication data into the message array. The model reads messages; it does not need to see your session metadata.