Layer 2 control loops - driving multi-step inference, tool calls, and structured output.
The orchestrate package (layer 2) drives work across many inference steps.
It breaks requests into steps, routes work across subsystems, enforces ordering and state transitions,
and manages retries, branching, and tool loops. A pagantic system is not one inference call -
it is a controlled multi-step loop.
Four orchestration patterns serve different use cases:
| Pattern | State | Purpose |
|---|---|---|
| AgentLoop | Stateful | Multi-turn conversational agent with tool loop |
| SpecializedLoop | Stateless | Single-shot structured output extraction |
| PlanExecutor | Stateless | Multi-step typed pipeline (retrieve, rerank, infer, validate) |
| RedundantLoop | Stateless | N-version inference with voting for reliability |
github.com/miroslav-matejovsky/pagantic/layers/02_orchestrate,
typically imported as orchestrate.
Every orchestration pattern follows the execution lifecycle state machine. Each pattern traverses a subset of states. Understanding which states a pattern uses helps reason about failure handling, retry behavior, and observability events.
| Pattern | States Traversed | Retry Path | Tool Loop |
|---|---|---|---|
| AgentLoop | INIT -> PREPARE -> EXECUTE -> COMPLETE | EXECUTE -> VALIDATE -> EXECUTE (per turn) | Yes, within EXECUTE |
| SpecializedLoop | INIT -> PREPARE -> EXECUTE -> VALIDATE -> COMPLETE | VALIDATE -> EXECUTE | No |
| PlanExecutor | INIT -> PLAN -> PREPARE -> EXECUTE -> VALIDATE -> COMPLETE | Per-step: EXECUTE -> VALIDATE -> EXECUTE | Per step |
| RedundantLoop | INIT -> PREPARE -> EXECUTE -> VALIDATE -> COMPLETE | N/A (voting replaces retry) | No |
Each pattern can emit and recover from specific failure categories:
| Pattern | Can Emit | Can Recover From |
|---|---|---|
| AgentLoop | InferenceFailure, ToolFailure, Cancellation | ToolFailure (retry tool call) |
| SpecializedLoop | InferenceFailure, ConstraintFailure, ValidationFailure, Cancellation | ConstraintFailure (repair), ValidationFailure (retry) |
| PlanExecutor | All categories | ConstraintFailure, ValidationFailure (per step), ToolFailure (if step uses tools) |
| RedundantLoop | InferenceFailure, ConstraintFailure, OrchestrationFailure, Cancellation | InferenceFailure (other replicas succeed), ConstraintFailure (repair per replica) |
ExecutionState.Attempt. When max attempts are exhausted, the state
transitions to ERROR with the last failure category.
The primary pattern for conversational agents. AgentLoop maintains message
history across Chat() calls using an internal ConversationBuffer.
Each call appends user message, runs tool loops if needed, and stores the final assistant
response - building up a full conversation over time.
Every Chat() call follows this sequence:
ConversationBuffer.ContextProvider is set, context is retrieved ephemerally (not stored in history).MaxToolIterations prevents infinite tool loops (default: 20).type LoopConfig struct {
Engine inference.Engine // required - the LLM backend
Tools *tool.Registry // optional - enables tool calls
SystemPrompt string // prepended as system message
Grammar string // GBNF grammar; only for ChatStructured, not Chat
MaxTokens int // default 2048
MaxToolIterations int // default 20; max tool-call loop rounds
Stream *inference.StreamHandler // streaming callbacks for token-by-token output
OnToolResult func(name, output string) // hook called after each tool execution
Observer observe.EventLog // event recording for tracing/debugging
ContextProvider ContextProvider // optional - per-turn context retrieval
}
| Field | Default | Notes |
|---|---|---|
Engine | - | Required. Any type satisfying inference.Engine. |
Tools | nil | When nil, model cannot make tool calls. |
SystemPrompt | "" | Appended as first message in buffer at construction time. |
Grammar | "" | GBNF grammar string. Only applied in ChatStructured. |
MaxTokens | 2048 | Maximum tokens per inference call. |
MaxToolIterations | 20 | Safety limit on tool-call loop rounds. |
Stream | nil | Passed to engine via WithStreamHandler if engine supports it. |
OnToolResult | nil | Called with tool name and output string after each execution. |
Observer | nil | Records events for "infer", "tool", and "context" actions. |
ContextProvider | nil | Retrieves context messages per turn. See Ephemeral Context. |
// Create agent with tools and context
agent := orchestrate.NewAgentLoop(orchestrate.LoopConfig{
Engine: engine,
Tools: tool.NewRegistry(&searchTool{}, &calcTool{}),
SystemPrompt: "You are a helpful assistant with access to tools.",
ContextProvider: contextBuilder, // satisfies ContextProvider interface
Stream: tui.TerminalRenderer(os.Stdout),
Observer: &observe.InMemoryEventLog{},
})
// Multi-turn conversation - state accumulates across calls
result1, err := agent.Chat(ctx, "What is the weather in Paris?")
// agent uses search tool, returns answer
result2, err := agent.Chat(ctx, "How about London?")
// agent remembers Paris was discussed, asks about London
// Access full conversation history
messages := agent.Messages()
// [system, user("Paris?"), assistant("Paris is..."), user("London?"), assistant("London is...")]
Messages() method returns a copy of conversation state.
Use it to inspect history, serialize for persistence, or pass to other systems.
Context retrieved via ContextProvider is ephemeral - it is injected into
the inference request but never stored in the conversation buffer. This prevents
retrieved context from accumulating across turns, which would bloat the context window and
potentially confuse the model with stale information.
// Internally, AgentLoop does this each Chat() call:
// 1. Retrieve fresh context for the current user message
ctxMsgs := al.fetchContext(ctx, userMessage)
// 2. Store user message in persistent memory
al.memory.Append(core.NewUserMessage(userMessage))
// 3. Build request: memory + ephemeral context injected before user message
requestMsgs := al.memoryWithEphemeralContext(ctxMsgs)
// ctxMsgs appear in the request but are NOT in al.memory
// 4. After inference, storeResult rebuilds memory from the base snapshot,
// ignoring ephemeral context entirely
al.storeResult(memBase, result)
ContextProvider.Build() returns an error, the agent
degrades gracefully - it proceeds without context and records the error via the Observer.
Context retrieval never blocks or fails the inference call.
The memoryWithEphemeralContext method inserts context messages just before the
last message (the user message) in the buffer. This placement ensures the model sees relevant
context immediately before the user query, matching the standard RAG pattern.
ChatStructured forces the model to produce JSON conforming to a given schema.
It applies several layers of constraint enforcement:
enable_thinking: false).constraint.RepairJSON.constraint.NormalizeEnumValues.schema := core.Schema{
Type: "object",
Properties: map[string]core.Schema{
"sentiment": {Type: "string", Enum: []string{"positive", "negative", "neutral"}},
"confidence": {Type: "number"},
},
Required: []string{"sentiment", "confidence"},
}
result, err := agent.ChatStructured(ctx, "Analyze: great product!", schema)
// result.Content is valid JSON: {"sentiment":"positive","confidence":0.95}
json.Valid check, then RepairJSON if invalid, then
NormalizeEnumValues, and finally SchemaValidator. Each step is
applied only when needed. The constraint package (layer 5) provides all repair
and validation logic.
Grammar field in LoopConfig is only
applied in ChatStructured, never in Chat. This is intentional -
GBNF grammars constrain output format, which only makes sense for structured extraction.
AgentLoop uses three distinct memory categories. Understanding which artifacts are persisted and which are ephemeral is critical for correct multi-turn behavior.
| Category | Lifetime | Contents |
|---|---|---|
| ConversationMemory | Persisted across turns | User and assistant messages in ConversationBuffer. Grows with each Chat() call. |
| WorkingMemory | Per-turn transient | Retrieved context blocks, tool results, intermediate inference artifacts. Discarded after each turn. |
| SessionState | Durable across turns | Key-value decisions that carry forward (e.g., user preferences, resolved entities). Thread-safe. |
For one-off structured output tasks. SpecializedLoop creates a fresh
AgentLoop per Call() - no state accumulates between calls.
Every invocation starts with a clean slate.
AgentLoop (no history).ContextProvider is set: retrieves context using the original prompt, injects it directly into the inner loop via injectContext().Tools are set: runs Chat(prompt) first (tool phase), then ChatStructured(phase2Prompt, schema).Tools: runs ChatStructured(prompt, schema) directly."Produce your structured output now."
type SpecializedConfig struct {
Engine inference.Engine // required - the LLM backend
Tools *tool.Registry // optional - enables two-phase execution
SystemPrompt string // system prompt for inner AgentLoop
Schema core.Schema // required - JSON schema for output
Grammar string // GBNF grammar; empty means none
MaxTokens int // default 2048
Stream *inference.StreamHandler // streaming callbacks
Observer observe.EventLog // event recording
ContextProvider ContextProvider // optional - retrieves context using original prompt
}
// Basic single-shot structured extraction
sl := orchestrate.NewSpecializedLoop(orchestrate.SpecializedConfig{
Engine: engine,
SystemPrompt: "Analyze sentiment. Return structured JSON.",
Schema: sentimentSchema,
Grammar: `root ::= "{" ws "\"sentiment\"" ws ":" ws val ws "}"
ws ::= [ \t\n]*
val ::= "\"positive\"" | "\"negative\"" | "\"neutral\""`,
Stream: tui.TerminalRenderer(os.Stdout),
})
result, err := sl.Call(ctx, "I love this product!")
// result.Content: {"sentiment":"positive"}
// With tools - two-phase execution
sl := orchestrate.NewSpecializedLoop(orchestrate.SpecializedConfig{
Engine: engine,
Tools: tool.NewRegistry(&webSearch{}, &calculator{}),
SystemPrompt: "Research the topic, then produce a structured summary.",
Schema: core.Schema{
Type: "object",
Properties: map[string]core.Schema{
"summary": {Type: "string"},
"sources": {Type: "array", Items: &core.Schema{Type: "string"}},
},
Required: []string{"summary", "sources"},
},
})
result, err := sl.Call(ctx, "Latest advances in quantum computing")
// Phase 1: model calls webSearch tool to gather info
// Phase 2: model produces structured JSON summary
| Use SpecializedLoop when... | Use AgentLoop instead when... |
|---|---|
| Single-shot classification or extraction | Multi-turn conversation needed |
| Schema-constrained JSON output required | Free-form text responses needed |
| No state between calls | Conversation history matters |
| GBNF grammar enforcement needed | Grammar not needed for free chat |
| Tools + structured output in one call | Tool use without structured output |
SpecializedLoop is stateless - it creates a fresh inner loop for each call. All memory is transient:
A Planner creates an ExecutionPlan from a SystemRequest. PlanExecutor executes plans - it does not create them. The separation between plan creation and plan execution is an explicit architectural boundary.
// Planner creates an ExecutionPlan from a SystemRequest.
// Implementations range from static plan builders to LLM-based planners.
type Planner interface {
CreatePlan(ctx context.Context, req SystemRequest) (ExecutionPlan, error)
}
Constraints on plan construction:
type PlanPolicy struct {
MaxSteps int // Upper bound on plan length
AllowedStepTypes []StepType // Which step types this mode permits
RequireLinear bool // If true, no branching/parallel steps
TimeoutPerStep time.Duration // Per-step deadline
}
Metadata attached to a plan for observability and debugging:
type PlanTrace struct {
PlannerID string // Which planner created this plan
Rationale string // Why these steps were chosen
CreatedAt time.Time
StepCount int
}
Adapters route requests through a single facade. When mode=plan,
the facade calls the configured Planner to produce an ExecutionPlan, then passes
it to PlanExecutor. When a plan is provided directly (e.g., manually authored),
the Planner step is skipped.
For complex workflows with typed steps. Each step has a type, handler, input, and output. The output of step N automatically feeds as input to step N+1 (when step N+1 has nil Input). This creates composable data pipelines for RAG, validation, and multi-stage inference.
// StepType categorizes what kind of work a step performs
type StepType string
const (
StepInfer StepType = "infer" // runs model inference
StepTool StepType = "tool" // runs tool call
StepRetrieve StepType = "retrieve" // retrieves context
StepRerank StepType = "rerank" // reranks candidates
StepValidate StepType = "validate" // validates output
)
// Step is one unit of work in an execution plan
type Step struct {
Name string // human-readable step name
Type StepType // determines which handler runs this step
Input any // nil means use previous step's Output
Output any // set by handler after execution
}
// ExecutionPlan is an ordered list of steps
type ExecutionPlan struct {
Steps []Step
}
// StepHandler processes one step. Receives step with Input set,
// returns step with Output set.
type StepHandler func(ctx context.Context, step Step) (Step, error)
Input is nil and it is
not the first step, PlanExecutor automatically sets Input to the
previous step's Output. Set Input explicitly to override this behavior.
Four built-in handler constructors cover common step types. Each accepts a dependency and
returns a StepHandler. All have nil guards - passing nil returns a
handler that immediately errors with a descriptive message.
// InferHandler - runs inference engine
// Input: inference.Request | Output: *inference.Result
orchestrate.InferHandler(engine)
// RerankHandler - reranks candidates by relevance
// Input: RerankInput | Output: []RerankCandidate
orchestrate.RerankHandler(reranker)
// RetrieveHandler - retrieves context for a query
// Input: string (query) | Output: []core.Message
orchestrate.RetrieveHandler(contextProvider)
// ValidateHandler - validates output with a check function
// Input: string | Output: string (passthrough if valid)
orchestrate.ValidateHandler(func(output string) error {
if output == "" {
return fmt.Errorf("empty output")
}
return nil
})
InferHandler(nil) returns a handler that errors with
"infer handler: nil engine". This fails fast at execution time with a clear message
rather than panicking with a nil pointer.
The plan layer defines its own rerank types to avoid importing the rerank package directly.
This follows the same structural typing pattern as ContextProvider.
// RerankCandidate represents a scored item for plan-level reranking
type RerankCandidate struct {
Content string
Score float64
Source string
Metadata map[string]any
}
// RerankInput groups candidates with the original query
type RerankInput struct {
Query string
Candidates []RerankCandidate
}
// CandidateReranker scores and filters candidates
type CandidateReranker interface {
Rerank(ctx context.Context, input RerankInput) ([]RerankCandidate, error)
}
// Define a retrieve -> rerank -> infer pipeline
plan := orchestrate.ExecutionPlan{
Steps: []orchestrate.Step{
{
Name: "retrieve",
Type: orchestrate.StepRetrieve,
Input: "How do interfaces work in Go?", // explicit query string
},
{
Name: "rerank",
Type: orchestrate.StepRerank,
// Input: nil - auto-chained from retrieve Output
},
{
Name: "infer",
Type: orchestrate.StepInfer,
// Input: nil - auto-chained from rerank Output
},
},
}
// Create executor with handler for each step type
executor := orchestrate.NewPlanExecutor(map[orchestrate.StepType]orchestrate.StepHandler{
orchestrate.StepRetrieve: orchestrate.RetrieveHandler(contextProvider),
orchestrate.StepRerank: orchestrate.RerankHandler(reranker),
orchestrate.StepInfer: orchestrate.InferHandler(engine),
})
// Execute the pipeline
results, err := executor.Execute(ctx, plan)
if err != nil {
log.Fatal(err)
}
// Access each step's output
retrievedMsgs := results[0].Output // []core.Message - retrieved chunks
rerankedItems := results[1].Output // []RerankCandidate - reranked candidates
inferResult := results[2].Output // *inference.Result - final answer
Execute returns all completed steps even on error. If step 2
fails, you still get results[0] with the output from step 1. This enables partial
result inspection for debugging.
// Add validation as a final step
plan := orchestrate.ExecutionPlan{
Steps: []orchestrate.Step{
{Name: "retrieve", Type: orchestrate.StepRetrieve, Input: query},
{Name: "infer", Type: orchestrate.StepInfer},
{Name: "validate", Type: orchestrate.StepValidate},
},
}
executor := orchestrate.NewPlanExecutor(map[orchestrate.StepType]orchestrate.StepHandler{
orchestrate.StepRetrieve: orchestrate.RetrieveHandler(contextProvider),
orchestrate.StepInfer: orchestrate.InferHandler(engine),
orchestrate.StepValidate: orchestrate.ValidateHandler(func(output string) error {
if len(output) < 10 {
return fmt.Errorf("output too short: %d chars", len(output))
}
return nil
}),
})
Any function matching func(ctx context.Context, step Step) (Step, error) is a
valid StepHandler. This makes it easy to add custom processing steps.
// Custom handler that transforms text
summarizeHandler := func(ctx context.Context, step orchestrate.Step) (orchestrate.Step, error) {
input, ok := step.Input.(string)
if !ok {
return step, fmt.Errorf("summarize: expected string input, got %T", step.Input)
}
// Custom processing logic
summary := strings.TrimSpace(input)
if len(summary) > 500 {
summary = summary[:500] + "..."
}
step.Output = summary
return step, nil
}
// Register custom step type
const StepSummarize orchestrate.StepType = "summarize"
executor := orchestrate.NewPlanExecutor(map[orchestrate.StepType]orchestrate.StepHandler{
orchestrate.StepRetrieve: orchestrate.RetrieveHandler(provider),
StepSummarize: summarizeHandler,
orchestrate.StepInfer: orchestrate.InferHandler(engine),
})
PlanExecutor.Execute checks
ctx.Err() before each step. If the context is cancelled or expired, execution
stops immediately and returns the completed steps so far.
Runs the same inference N times and picks the best result by voting. Implements Triple Modular Redundancy (TMR) / N-Version programming for LLM output.
Why? LLM output is non-deterministic. Running the same prompt multiple times and voting on results improves reliability, especially for classification and structured extraction tasks where consistency matters more than creativity.
SpecializedLoop.Call() invocations sequentially (to avoid GPU contention).VotingStrategy to pick a winner.type RedundantResult struct {
Content string // winning result chosen by voting strategy
Confidence float64 // how strongly voting supports the winner (0-1)
Candidates []string // all N raw inference outputs
Usage core.TokenUsage // combined token usage across all N calls
}
type RedundantConfig struct {
Engine inference.Engine // required - the LLM backend
SystemPrompt string // system prompt for each SpecializedLoop
Schema core.Schema // JSON schema for structured output
Grammar string // GBNF grammar; empty means none
N int // number of candidates; 0 defaults to 3
Voting VotingStrategy // nil defaults to MajorityVoting{}
MaxTokens int // default 2048
Observer observe.EventLog // event recording
}
| Field | Default | Notes |
|---|---|---|
N | 3 | Number of inference runs. Higher N = more reliable but slower. |
Voting | MajorityVoting{} | Strategy for picking winner from candidates. |
MaxTokens | 2048 | Applied to each individual SpecializedLoop call. |
Picks the most frequent result. Confidence equals the fraction of candidates matching the winner. Ties are broken by first-seen order (deterministic).
type MajorityVoting struct{}
// Example with 3 candidates:
// candidates = ["positive", "positive", "negative"]
// winner = "positive", confidence = 2/3 = 0.667
// Example with 5 candidates and tie:
// candidates = ["a", "b", "a", "b", "c"]
// winner = "a" (first-seen with count 2), confidence = 2/5 = 0.4
Requires all N candidates to match. Returns an error if any candidate differs. Confidence is always 1.0 when successful.
type UnanimityVoting struct{}
// All match: winner = "positive", confidence = 1.0
// Any differ: error "unanimity voting: candidate 2 differs from candidate 0"
ChatStructured helps ensure consistent output format.
Implement the VotingStrategy interface to create custom voting logic.
type VotingStrategy interface {
Vote(candidates []string) (winner string, confidence float64, err error)
}
// Example: weighted voting by JSON similarity
type SimilarityVoting struct{}
func (s SimilarityVoting) Vote(candidates []string) (string, float64, error) {
if len(candidates) == 0 {
return "", 0, fmt.Errorf("no candidates")
}
// Score each candidate by average similarity to all others
bestIdx := 0
bestScore := 0.0
for i, a := range candidates {
score := 0.0
for j, b := range candidates {
if i != j {
score += similarity(a, b)
}
}
avg := score / float64(len(candidates)-1)
if avg > bestScore {
bestScore = avg
bestIdx = i
}
}
return candidates[bestIdx], bestScore, nil
}
rl := orchestrate.NewRedundantLoop(orchestrate.RedundantConfig{
Engine: engine,
SystemPrompt: "Classify sentiment as positive, negative, or neutral.",
Schema: core.Schema{
Type: "object",
Properties: map[string]core.Schema{
"sentiment": {Type: "string", Enum: []string{"positive", "negative", "neutral"}},
},
Required: []string{"sentiment"},
},
N: 3, // 3 candidates
Voting: orchestrate.MajorityVoting{}, // majority wins
Grammar: grammarStr, // optional GBNF
})
result, err := rl.Call(ctx, "I love this product!")
if err != nil {
log.Fatal(err)
}
fmt.Printf("Winner: %s\n", result.Content)
// {"sentiment":"positive"}
fmt.Printf("Confidence: %.0f%%\n", result.Confidence*100)
// 100% (if all 3 agree)
fmt.Printf("Candidates: %v\n", result.Candidates)
// all 3 raw outputs
fmt.Printf("Total tokens: %d\n", result.Usage.OutputTokens)
// combined across all 3 calls
TokensPerSecond
is averaged, and ContextWindow uses the maximum value seen.
RedundantLoop produces a Confidence score
based on voting agreement. This score is propagated into
SystemResponse.Confidence with
ConfidenceSource = "voting".
Confidence is not just a loop-local result. It flows into the SystemResponse so callers can use it as a quality gate or informational signal.
| Need | Pattern |
|---|---|
| Multi-turn chat with memory | AgentLoop |
| Single-shot structured output | SpecializedLoop |
| Multi-step pipeline (retrieve -> rerank -> infer) | PlanExecutor |
| High-reliability classification | RedundantLoop |
| Tools + structured output in one call | SpecializedLoop (two-phase) |
| Custom step orchestration | PlanExecutor with custom handlers |
| Free-form chat without tools | AgentLoop (no Tools configured) |
| Confidence-gated decisions | RedundantLoop |
The four patterns are designed to compose. Each higher-level pattern builds on simpler ones:
SpecializedLoop internally for each of its N candidates.AgentLoop internally - it creates a fresh one per call.StepHandler functions.// Wrap an AgentLoop as a StepHandler
agentHandler := func(ctx context.Context, step orchestrate.Step) (orchestrate.Step, error) {
prompt, ok := step.Input.(string)
if !ok {
return step, fmt.Errorf("expected string input")
}
agent := orchestrate.NewAgentLoop(orchestrate.LoopConfig{
Engine: engine,
Tools: tools,
SystemPrompt: "You are a research assistant.",
})
result, err := agent.Chat(ctx, prompt)
if err != nil {
return step, err
}
step.Output = result.Content
return step, nil
}
// Wrap a RedundantLoop as a StepHandler
reliableClassifier := func(ctx context.Context, step orchestrate.Step) (orchestrate.Step, error) {
prompt, ok := step.Input.(string)
if !ok {
return step, fmt.Errorf("expected string input")
}
rl := orchestrate.NewRedundantLoop(orchestrate.RedundantConfig{
Engine: engine,
Schema: classificationSchema,
N: 3,
})
result, err := rl.Call(ctx, prompt)
if err != nil {
return step, err
}
step.Output = result.Content
return step, nil
}
// Compose in a pipeline: research with tools, then reliably classify
const StepResearch orchestrate.StepType = "research"
const StepClassify orchestrate.StepType = "classify"
executor := orchestrate.NewPlanExecutor(map[orchestrate.StepType]orchestrate.StepHandler{
StepResearch: agentHandler,
StepClassify: reliableClassifier,
})
plan := orchestrate.ExecutionPlan{
Steps: []orchestrate.Step{
{Name: "research", Type: StepResearch, Input: "Analyze market trends for Q4"},
{Name: "classify", Type: StepClassify}, // auto-chained from research
},
}
results, err := executor.Execute(ctx, plan)
ContextProvider and CandidateReranker
are interfaces defined in the orchestrate package. Other packages (like context and
rerank) satisfy them via Go structural typing without importing orchestrate.
This keeps layer dependencies flowing downward.