Execution Engine and Runtime¶
This document is the authoritative reference for Agent Baton's execution
engine (agent_baton/core/engine/) and runtime system
(agent_baton/core/runtime/). It covers the complete lifecycle of an
orchestrated task from plan creation through completion, including the
state machine, knowledge resolution, gate system, daemon mode, and crash
recovery.
1. Overview¶
The execution engine is the core of Agent Baton. It converts a human-readable task description into a machine-readable execution plan, then drives that plan through a state machine that coordinates multiple AI agents, enforces quality gates, and persists state for crash recovery.
Design Philosophy¶
Separation of concerns. The system is split into two layers:
- Engine (
core/engine/) -- deterministic state machine. Owns plan state, decides what happens next, records results. Stateless between calls (all state lives on disk). Called repeatedly by the driving session. - Runtime (
core/runtime/) -- async execution layer. Launches agents, manages concurrency, handles signals, runs the daemon. Wraps the engine without replacing it.
The engine is the source of truth. The runtime calls into the engine
via the ExecutionDriver protocol. The engine never calls the runtime.
This means the engine can be driven by the CLI, the runtime, or tests
-- all through the same API.
State is always on disk. Every state mutation is persisted before
the engine returns. If the process crashes between calls, baton execute
resume picks up exactly where it left off.
Component Map¶
core/engine/ core/runtime/
planner.py IntelligentPlanner worker.py TaskWorker
executor.py ExecutionEngine supervisor.py WorkerSupervisor
dispatcher.py PromptDispatcher scheduler.py StepScheduler
gates.py GateRunner launcher.py AgentLauncher (protocol)
persistence.py StatePersistence claude_launcher.py ClaudeCodeLauncher
protocols.py ExecutionDriver context.py ExecutionContext
knowledge_resolver.py KnowledgeResolver daemon.py daemonize()
knowledge_gap.py parse/escalation signals.py SignalHandler
plan_reviewer.py PlanReviewer decisions.py DecisionManager
consolidator.py CommitConsolidator
Relationship Between Engine and Runtime¶
CLI (baton execute next)
|
v
+---------------------------------------------------+
| ExecutionEngine |
| (state machine, plan state, trace recording) |
+---------------------------------------------------+
^ ^
| |
CLI-driven loop Runtime-driven loop
(synchronous, (async, parallel,
one action at a time) daemon mode)
| |
v v
Claude session TaskWorker
reads _print_action() calls engine.next_actions()
spawns Agent tool dispatches via StepScheduler
calls baton execute record records results back
2. Execution Lifecycle¶
The complete flow from task description to completion:
baton plan "task" --save --explain
|
v
IntelligentPlanner.create_plan()
|
v
plan.json + plan.md written to .claude/team-context/
|
v
baton execute start
|
v
ExecutionEngine.start(plan)
- Creates ExecutionState
- Starts trace
- Saves state to disk
- Returns first action
|
v
+--> ExecutionEngine.next_action()
| |
| +-- DISPATCH --> caller spawns agent
| | baton execute dispatched
| | (agent works)
| | baton execute record
| |
| +-- GATE -----> caller runs gate command
| | baton execute gate --result pass|fail
| |
| +-- APPROVAL --> caller presents to human
| | baton execute approve
| |
| +-- WAIT -----> parallel steps in flight; retry
| |
| +-- FAILED ---> execution stops
| |
| +-- COMPLETE -> baton execute complete
| - Writes trace
+---<--- (loop) - Writes usage log
- Generates retrospective
- Triggers improvement cycle
ASCII Sequence Diagram: CLI-Driven Execution¶
Claude CLI Engine Disk
| | | |
|--plan------->| | |
| |--create_plan->| |
| |<--MachinePlan-| |
| |--save---------|------------->| plan.json
| | | |
|--execute---->| | |
| start |--start(plan)->| |
| | |--save------->| execution-state.json
| |<--DISPATCH----| |
| | | |
|--Agent tool->| (spawns agent)| |
| | | |
|--execute---->| | |
| record |--record------>| |
| | |--save------->| execution-state.json
| | | |
|--execute---->| | |
| next |--next_action->| |
| |<--GATE--------| |
| | | |
|--execute---->| | |
| gate |--record_gate->| |
| | |--save------->| execution-state.json
| | | |
| ... | (repeats) | |
| | | |
|--execute---->| | |
| complete |--complete()--->| |
| | |--save------->| trace-YYYY.json
| | |--save------->| usage-log.jsonl
| | |--save------->| retrospective.json
| |<--summary-----| |
3. Planning System¶
IntelligentPlanner¶
Source: agent_baton/core/engine/planner.py
The planner transforms a task description into a MachinePlan. It is
data-driven: when historical data is available, plans are shaped by
learned patterns, agent performance scores, and budget recommendations.
When no history exists, it falls back to sensible defaults.
Constructor¶
IntelligentPlanner(
team_context_root: Path | None = None,
classifier: DataClassifier | None = None,
policy_engine: PolicyEngine | None = None,
retro_engine: RetroEngine | None = None,
knowledge_registry: KnowledgeRegistry | None = None,
)
All dependencies are optional. The planner degrades gracefully:
| Dependency | When absent |
|---|---|
PatternLearner |
Default phase templates used |
PerformanceScorer |
No score warnings |
BudgetTuner |
Agent-count heuristic |
DataClassifier |
Keyword-only risk assessment |
PolicyEngine |
No policy validation |
RetroEngine |
No retrospective feedback integration |
KnowledgeRegistry |
No knowledge resolution |
create_plan() -- 15-Step Pipeline¶
def create_plan(
self,
task_summary: str,
*,
task_type: str | None = None,
project_root: Path | None = None,
agents: list[str] | None = None,
phases: list[dict] | None = None,
explicit_knowledge_packs: list[str] | None = None,
explicit_knowledge_docs: list[str] | None = None,
intervention_level: str = "low",
) -> MachinePlan:
Steps executed in order:
- Generate task_id -- format:
YYYY-MM-DD-<slug>-<8-char-uuid>. - Detect project stack --
AgentRouter.detect_stack(project_root). - Infer task type -- keyword matching against
_TASK_TYPE_KEYWORDSin priority order: bug-fix, migration, refactor, data-analysis, new-feature, test, documentation. Falls back tonew-feature. - Pattern lookup --
PatternLearner.get_patterns_for_task()with minimum confidence threshold of 0.7. - Determine agents -- explicit override, pattern recommendation,
or
_DEFAULT_AGENTSlookup by task type. - 5b. Retrospective feedback -- filter dropped agents, record
preferences via
_apply_retro_feedback(). - Route agents --
AgentRouter.route()maps base names to stack-specific flavors (e.g.backend-engineertobackend-engineer--python). - 6.5. Create
KnowledgeResolverif registry is available. - Classify sensitivity --
DataClassifier.classify()if available. - Assess risk -- combines classifier output with keyword signals
(
_RISK_SIGNALS) and structural signals (agent count, sensitive agents, destructive verbs). Read-only dampening prevents false positives. - 8b. Git strategy --
COMMIT_PER_AGENTfor LOW/MEDIUM,BRANCH_PER_AGENTfor HIGH/CRITICAL. - Build phases -- from explicit dicts, pattern, or defaults.
- 9b. Enrich -- cross-phase context references, default
deliverables from
_AGENT_DELIVERABLES. - 9.5. Resolve knowledge --
KnowledgeResolver.resolve()per step with explicit packs/docs. - 9.6. Gap-suggested attachments -- query pattern learner for prior gaps matching agent + task type.
- Score check -- warn about low-health agents via
PerformanceScorer. - Budget tier --
BudgetTunerrecommendation or heuristic (lean/standard/full based on agent count).- 11b. Policy validation -- check agent assignments against active policy set. Violations are warnings, never hard-blocks.
- Add QA gates -- default gates per phase name:
- Implement/Fix:
buildgate (pytest) - Test:
testgate (pytest --cov) - Review and others: no automated gate
- 12b. Approval gates -- HIGH/CRITICAL risk plans get approval gates on Design/Research phases.
- 12c. Team consolidation -- multi-agent Implement/Fix phases are merged into team steps.
- Implement/Fix:
- Context files -- every step gets
CLAUDE.md; file paths extracted from task summary are appended.- 13b. Model inheritance -- agent definition model preferences.
- 13c. Context richness -- file path extraction from summary.
- Build shared context -- mission summary, risk, governance results, retrospective insights.
- Return
MachinePlan.
Task Type Inference¶
Keywords are matched in priority order (first match wins):
| Task Type | Keywords |
|---|---|
bug-fix |
fix, bug, broken, error, crash, traceback, exception, patch |
migration |
migrate, migration, upgrade, move |
refactor |
refactor, clean, reorganize, restructure, rename, cleanup |
data-analysis |
analyze, report, dashboard, query, insight, metric |
new-feature |
add, build, create, implement, new, feature, develop |
test |
test, tests, testing, coverage, e2e, unit, integration |
documentation |
doc, docs, readme, spec, adr, document, wiki, review, summarize |
Risk Assessment¶
Risk is computed by combining signals:
- Keyword signals -- words like "production", "infrastructure",
"deploy", "security" map to HIGH; "migration", "database" map to
MEDIUM (via
_RISK_SIGNALS). - Structural signals -- more than 5 agents raises to at least MEDIUM; sensitive agent types (security-reviewer, auditor, devops) raise to MEDIUM; destructive verbs raise to MEDIUM.
- Read-only dampening -- if the first word is a read-only verb (review, analyze, inspect) and no sensitive agents are present, the score is capped at LOW.
- Classifier floor -- if
DataClassifieris available, its risk level is the floor (keyword signals can raise but not lower it).
Agent Routing¶
The planner uses affinity-based assignment to distribute agents across phases:
- Pass 1: assign agents to their ideal phases based on
_PHASE_IDEAL_ROLES(e.g. architect to Design, test-engineer to Test). - Pass 2: remaining agents assigned to remaining phases round-robin.
- Pass 3: unassigned phases get the best-fit agent from the full pool.
- Pass 4: leftover agents added to phases where they have affinity.
- Guarantee: every phase has at least one step.
Plan Structure¶
MachinePlan
task_id: str
task_summary: str
risk_level: "LOW" | "MEDIUM" | "HIGH" | "CRITICAL"
budget_tier: "lean" | "standard" | "full"
git_strategy: "Commit-per-agent" | "Branch-per-agent"
task_type: str
intervention_level: "low" | "medium" | "high"
phases: list[PlanPhase]
phase_id: int (1-based)
name: str
approval_required: bool
steps: list[PlanStep]
step_id: str (e.g. "1.1")
agent_name: str
task_description: str
model: str (default "sonnet")
depends_on: list[str]
context_files: list[str]
knowledge: list[KnowledgeAttachment]
team: list[TeamMember] (non-empty = team step)
gate: PlanGate | None
gate_type: "build" | "test" | "lint" | "spec" | "review"
command: str
fail_on: list[str]
explain_plan()¶
Returns a human-readable markdown explanation including: - Pattern influence (pattern ID, confidence, success rate) - Score warnings for low-health agents - Agent routing decisions - Data classification and guardrail preset - Policy notes and violations - Phase summary with agents and gates
4. Execution Engine¶
ExecutionEngine¶
Source: agent_baton/core/engine/executor.py
The engine is a state machine that is called repeatedly by the driving session. Each call reads state from disk, computes the next action, and persists the updated state.
Constructor¶
ExecutionEngine(
team_context_root: Path | None = None,
bus: EventBus | None = None,
task_id: str | None = None,
storage=None, # SqliteStorage | FileStorage | None
knowledge_resolver=None, # KnowledgeResolver | None
)
The engine supports two storage modes:
| Mode | Primary I/O | Dual-write |
|---|---|---|
| Legacy file | StatePersistence (JSON files) |
N/A |
| Storage backend | SqliteStorage |
File persistence for backward compat |
When a storage backend is provided, the engine writes to SQLite
first, then dual-writes to the file system so file-based readers
(scanner, list/switch) stay current during the transition.
State Machine¶
start(plan)
|
v
+-- RUNNING --+
| |
+----------+----------+ |
| | | |
DISPATCH GATE APPROVAL|
| | | |
v v v |
record_ record_ record_ |
step_ gate_ approval|
result result _result |
| | | |
+----------+---+-------+ |
| | |
v v |
GATE_PENDING APPROVAL_PENDING
| |
v v
(re-enters RUNNING after result recorded)
|
v
COMPLETE <-- all phases done
|
v
FAILED <-- step failed or gate failed
Status Values¶
| Status | Meaning |
|---|---|
running |
Normal execution in progress |
gate_pending |
Waiting for gate result |
approval_pending |
Waiting for human approval |
complete |
All phases finished successfully |
failed |
A step or gate failed |
Action Types¶
| ActionType | Enum Value | When Returned |
|---|---|---|
DISPATCH |
"dispatch" |
Next step is ready for agent execution |
GATE |
"gate" |
All steps in phase complete; run QA gate |
APPROVAL |
"approval" |
Phase requires human approval before proceeding |
WAIT |
"wait" |
Parallel steps still in flight |
COMPLETE |
"complete" |
All phases exhausted; execution done |
FAILED |
"failed" |
Unrecoverable failure |
Public API¶
start(plan) -> ExecutionAction¶
Initialize execution from a MachinePlan:
- Creates ExecutionState with status="running".
- Starts a trace via TraceRecorder.
- Sets the active task ID.
- Persists state.
- Returns the first action via _determine_action().
next_action() -> ExecutionAction¶
Load state from disk and return the next action. The core state machine
logic (_determine_action) executes:
- Terminal check: if
completeorfailed, return immediately. approval_pending: return APPROVAL action.gate_pending: return GATE action.- Phase exhaustion: all phases done -> COMPLETE.
- Failed step check: any step failed -> FAILED.
- Find next dispatchable step (not completed, failed, dispatched, or
interrupted; all
depends_onsatisfied). - If found: return DISPATCH (or team DISPATCH for team steps).
- If steps still pending (dispatched or dependency-blocked): WAIT.
- All steps done: check approval -> gate -> advance to next phase.
next_actions() -> list[ExecutionAction]¶
Return ALL currently dispatchable actions for parallel execution.
Unlike next_action(), this returns every step whose dependencies are
satisfied. The caller can spawn all returned agents in parallel.
mark_dispatched(step_id, agent_name)¶
Record that a step is in-flight (status "dispatched"). Used by the
runtime to prevent the engine from re-dispatching steps during parallel
execution.
record_step_result(step_id, agent_name, status, outcome, ...)¶
Record an agent's result:
- Creates StepResult and appends to state.
- Extracts deviations from outcome text (sections headed
## Deviation/## Deviations).
- Runs the knowledge gap protocol on the outcome.
- Emits trace events (agent_complete or agent_failed).
- Logs telemetry.
- Persists state.
Valid status values: "complete", "failed", "dispatched",
"interrupted".
record_gate_result(phase_id, passed, output)¶
Record a gate check result:
- Creates GateResult and appends to state.
- If failed: sets status = "failed".
- If passed: advances current_phase and resets current_step_index.
- Emits trace events and domain events.
record_approval_result(phase_id, result, feedback)¶
Record a human approval decision:
- Valid results: "approve", "reject", "approve-with-feedback".
- "reject": sets status to "failed".
- "approve": sets status to "running".
- "approve-with-feedback": triggers _amend_from_feedback() which
inserts a remediation phase after the current phase, then sets status
to "running".
amend_plan(description, new_phases, insert_after_phase, ...)¶
Amend the running plan mid-execution:
- Inserts new phases at the specified position (default: after current
phase).
- Adds new steps to existing phases.
- Re-numbers all phase and step IDs via _renumber_phases().
- Creates a PlanAmendment audit record.
- Emits a replan trace event.
record_team_member_result(step_id, member_id, agent_name, ...)¶
Record a single team member's work within a team step:
- Creates TeamStepResult and appends to the parent StepResult.
- When all members complete, the parent step auto-completes.
- If any member fails, the parent step fails.
complete() -> str¶
Finalize execution:
1. Sets status = "complete".
2. Reconstructs trace from state if self._trace is None (CLI mode
creates a fresh engine instance per call).
3. Completes the trace via TraceRecorder.
4. Builds and logs a TaskUsageRecord.
5. Generates a retrospective via RetrospectiveEngine with rich
qualitative data (what worked, what didn't, knowledge gaps, roster
recommendations, sequencing notes).
6. Triggers the improvement loop (ImprovementLoop.run_cycle()).
7. Returns a summary string.
status() -> dict¶
Returns: task_id, status, current_phase, steps_complete,
steps_total, gates_passed, gates_failed, elapsed_seconds.
resume() -> ExecutionAction¶
Crash recovery:
1. Loads state from disk.
2. Reconnects the in-memory trace (loads existing or starts fresh).
3. Returns the next action via _determine_action().
recover_dispatched_steps() -> int¶
Clears stale dispatched step markers after a daemon crash, so the
engine will re-dispatch them on the next next_action() call. Returns
the number of recovered steps.
5. Dispatcher¶
PromptDispatcher¶
Source: agent_baton/core/engine/dispatcher.py
Stateless class that generates delegation prompts for agent subagents. Every method operates purely on its arguments.
build_delegation_prompt()¶
def build_delegation_prompt(
self,
step: PlanStep,
*,
shared_context: str = "",
handoff_from: str = "",
project_description: str = "",
task_summary: str = "",
task_type: str = "",
) -> str:
Produces a structured markdown prompt with these sections:
- Role line -- "You are a {role} working on {project}."
- Shared Context -- mission summary, risk, governance.
- Intent -- user's original task summary (verbatim).
- Knowledge Context -- inline knowledge documents.
- Knowledge References -- referenced documents with retrieval hints.
- Your Task (Step X) -- the step's task description.
- Success Criteria -- selected by task type from
_SUCCESS_CRITERIA(bug-fix, new-feature, refactor, test, documentation, migration, data-analysis). - Files to Read --
step.context_files. - Deliverables -- expected outputs.
- Boundaries -- allowed/blocked paths.
- Knowledge Gaps -- instructions for the agent to signal gaps.
- Previous Step Output -- handoff from preceding step.
- Decision Logging -- document non-obvious decisions.
- Deviations -- document plan misfit.
build_team_delegation_prompt()¶
Similar to build_delegation_prompt but tailored for team members:
- Includes team overview and member role.
- References dependencies on other team members.
- Knowledge is resolved at step level and shared across all members.
build_gate_prompt()¶
For automated gates: returns the command string (with {files}
placeholder substitution).
For review gates (no command): returns a reviewer prompt with the gate type, review task, fail criteria, and instructions.
build_path_enforcement()¶
Generates a bash guard command for PreToolUse hooks that blocks
writes outside allowed paths or inside blocked paths. Returns None
when the step has no path restrictions.
build_action()¶
Combines build_delegation_prompt + build_path_enforcement into a
complete ExecutionAction with action_type=DISPATCH.
Knowledge Injection¶
The dispatcher builds knowledge sections from KnowledgeAttachment
objects:
- Inline attachments: full content loaded from
source_pathviaparse_frontmatter(), rendered under## Knowledge Context. - Reference attachments: path + summary + retrieval hint, rendered
under
## Knowledge References. Retrieval hint is eitherRead <path>orquery RAG serverdepending onretrievalfield.
ContextHarvester (Wave 2.2)¶
Source: agent_baton/core/intel/context_harvester.py
Runs after every successful step (record_step_result →
ContextHarvester.harvest) and writes a compact 3-5 line summary into
the agent_context table keyed by (agent_name, domain). Domain is
derived from PlanStep.allowed_paths[0] (or files_changed[0]),
falling back to "general". On the next dispatch the executor reads
the row via ContextHarvester.fetch_one and passes it through
build_delegation_prompt(prior_context_block=...), prepending a
## Prior Context block (capped at 400 chars) so the agent skips
cold-start re-discovery. Best-effort — disabled by
BATON_HARVEST_CONTEXT=0. Inspected via baton context show <agent>.
6. Gate System¶
GateRunner¶
Source: agent_baton/core/engine/gates.py
Stateless class that builds gate actions and evaluates gate results.
Gate Types¶
| Type | Command | Evaluation Rule |
|---|---|---|
build |
python -m py_compile {files} |
passed = (exit_code == 0) |
test |
pytest --tb=short -q |
passed = (exit_code == 0) |
lint |
python -m py_compile {files} |
passed = (exit_code == 0 AND no error markers) |
spec |
(varies) | Delegates to SpecValidator.run_gate() |
review |
(none) | Always passes (advisory) |
approval |
(none) | Human checkpoint |
build_gate_action()¶
Builds an ExecutionAction with action_type=GATE:
- Populates gate_type, gate_command (with {files} substitution),
and phase_id.
evaluate_output()¶
Evaluates the output of a gate command and returns a GateResult:
- test/build: pass when exit_code == 0.
- lint: pass when exit_code == 0 AND no lint error markers in
output. Error markers: ": error:", ":E:", " E ", "[E",
"Error:", "ERROR", "SyntaxError", "error:".
- spec: delegates to SpecValidator.run_gate().
- review: always passes (advisory).
- Unknown types: fall back to exit_code == 0.
default_gates()¶
Returns a fresh dict of built-in gate definitions (build, test, lint, review). Callers may mutate the returned values.
7. State Persistence¶
StatePersistence¶
Source: agent_baton/core/engine/persistence.py
Handles reading and writing ExecutionState to disk with crash-safe
atomic writes (write to .json.tmp, then rename()).
Storage Layout¶
.claude/team-context/
executions/
<task-id-1>/execution-state.json
<task-id-2>/execution-state.json
active-task-id.txt <- points to default task
execution-state.json <- legacy flat file (backward compat)
When task_id is provided, state is stored under the namespaced path.
Without task_id, falls back to the legacy flat path.
API¶
| Method | Description |
|---|---|
save(state) |
Atomically write state (tmp + rename) |
load() |
Load state; returns None on missing or parse error |
exists() |
Check if state file exists |
clear() |
Remove state file |
set_active() |
Write task_id to active-task-id.txt |
get_active_task_id(root) |
Read active task ID (static) |
list_executions(root) |
List all namespaced task IDs (static) |
load_all(root) |
Load all states (namespaced + legacy) (static) |
Task-ID Resolution Order¶
Every baton execute subcommand resolves a target task ID through:
See docs/invariants.md Invariant 1 for the full contract.
execution-state.json Schema¶
{
"task_id": "2026-03-24-add-oauth-abc12345",
"plan": { /* MachinePlan.to_dict() */ },
"current_phase": 1,
"current_step_index": 0,
"status": "running",
"step_results": [ /* StepResult.to_dict() */ ],
"gate_results": [ /* GateResult.to_dict() */ ],
"approval_results": [ /* ApprovalResult.to_dict() */ ],
"amendments": [ /* PlanAmendment.to_dict() */ ],
"started_at": "2026-03-24T10:00:00+00:00",
"completed_at": "",
"pending_gaps": [ /* KnowledgeGapSignal.to_dict() */ ],
"resolved_decisions": [ /* ResolvedDecision.to_dict() */ ]
}
8. Knowledge Resolution¶
KnowledgeResolver¶
Source: agent_baton/core/engine/knowledge_resolver.py
Resolves knowledge attachments for each plan step through a 4-layer
pipeline with deduplication. Produces KnowledgeAttachment objects
with inline/reference delivery decisions governed by token budgets.
Constructor¶
KnowledgeResolver(
registry: KnowledgeRegistry,
*,
agent_registry: AgentRegistry | None = None,
rag_available: bool = False,
step_token_budget: int = 32_000,
doc_token_cap: int = 8_000,
)
4-Layer Resolution Pipeline¶
Layer 1: Explicit User-supplied --knowledge-pack / --knowledge flags
|
Layer 2: Agent-declared Packs listed in agent definition frontmatter
|
Layer 3: Tag matching Strict keyword/tag match against registry
|
Layer 4: Relevance TF-IDF search (only if Layer 3 returned nothing)
Each layer's results are deduplicated against earlier layers using a
key of source_path (preferred) or pack_name::doc_name.
Within each layer, documents are sorted by priority: high -> normal -> low.
Delivery Decision¶
For each resolved document, the resolver applies these rules:
| Condition | Delivery |
|---|---|
token_estimate <= 0 |
reference (unestimated) |
token_estimate > doc_token_cap (8K) |
reference (too large) |
token_estimate <= remaining_budget |
inline (fits budget, budget decremented) |
| Otherwise | reference (budget exhausted) |
Reference deliveries get retrieval="mcp-rag" when RAG is available,
otherwise retrieval="file".
KnowledgeGap -- Runtime Gap Detection¶
Source: agent_baton/core/engine/knowledge_gap.py
Handles parsing KNOWLEDGE_GAP signals from agent outcomes:
parse_knowledge_gap()¶
Parses the structured signal block from outcome text. Returns
KnowledgeGapSignal or None. Defaults: confidence="low",
gap_type="factual".
determine_escalation()¶
Applies the escalation matrix:
| Gap Type | Resolution | Risk x Intervention | Action |
|---|---|---|---|
| factual | match found | any | auto-resolve |
| factual | no match | LOW + low intervention | best-effort |
| factual | no match | LOW + medium/high | queue-for-gate |
| factual | no match | MEDIUM+ any | queue-for-gate |
| contextual | -- | any | queue-for-gate |
Engine Integration¶
When record_step_result() detects a knowledge gap:
- Parse the signal from outcome text.
- Attempt auto-resolution via
KnowledgeResolverif available. - Call
determine_escalation(). - auto-resolve: Record a
ResolvedDecisionon the state, amend the plan to insert a re-dispatch step for the same agent with the resolved knowledge attached. The interrupted step is marked"interrupted"and skipped by_determine_action(). - best-effort: Log and continue (no plan mutation).
- queue-for-gate: Append the signal to
state.pending_gapswhere it surfaces at the next human approval gate.
9. Runtime System¶
TaskWorker¶
Source: agent_baton/core/runtime/worker.py
Async event loop that drives a single task's execution. Wraps the
ExecutionEngine via the ExecutionDriver protocol.
TaskWorker(
engine: ExecutionDriver,
launcher: AgentLauncher,
bus: EventBus | None = None,
max_parallel: int = 3,
decision_manager: DecisionManager | None = None,
shutdown_event: asyncio.Event | None = None,
gate_poll_interval: float = 2.0,
)
Execution Loop¶
The core loop:
- Call
engine.next_action(). - COMPLETE: call
engine.complete(), return summary. - FAILED: return error message.
- WAIT: sleep 0.5s and retry.
- APPROVAL: route through
DecisionManageror auto-approve. - GATE: auto-approve programmatic gates (test, build, lint, spec);
route human gates through
DecisionManager. - DISPATCH: collect ALL dispatchable steps via
engine.next_actions(), mark each dispatched, dispatch in parallel viaStepScheduler, record all results.
Event Publishing¶
The worker publishes step-level events (step.dispatched,
step.completed, step.failed). Task-level and phase-level events are
published by the ExecutionEngine itself. This split avoids event
duplication.
StepScheduler¶
Source: agent_baton/core/runtime/scheduler.py
Bounded-concurrency dispatcher using asyncio.Semaphore.
@dataclass
class SchedulerConfig:
max_concurrent: int = 3
class StepScheduler:
async def dispatch(self, agent_name, model, prompt, step_id, launcher) -> LaunchResult
async def dispatch_batch(self, steps: list[dict], launcher) -> list[LaunchResult]
dispatch_batch() starts all steps concurrently but at most
max_concurrent run simultaneously. Results are returned in the same
order as the input steps.
AgentLauncher (Protocol)¶
Source: agent_baton/core/runtime/launcher.py
class AgentLauncher(Protocol):
async def launch(
self,
agent_name: str,
model: str,
prompt: str,
step_id: str = "",
) -> LaunchResult: ...
@dataclass
class LaunchResult:
step_id: str
agent_name: str
status: str = "complete" # "complete" or "failed"
outcome: str = ""
files_changed: list[str]
commit_hash: str = ""
estimated_tokens: int = 0
duration_seconds: float = 0.0
error: str = ""
Implementations:
- DryRunLauncher -- mock for testing; records launches, returns
synthetic results. Supports pre-configured per-step results via
set_result().
- ClaudeCodeLauncher -- production launcher that invokes the claude
CLI.
ClaudeCodeLauncher¶
Source: agent_baton/core/runtime/claude_launcher.py
Invokes the claude CLI as an async subprocess with strict security
properties:
- Environment whitelist: only
PATH,HOME, and explicitly listed variables (ANTHROPIC_API_KEY,CLAUDE_CODE_USE_BEDROCK,CLAUDE_CODE_USE_VERTEX,AWS_PROFILE,AWS_REGION) are forwarded.os.environis never copied wholesale. - No shell interpolation:
asyncio.create_subprocess_execis used exclusively. The prompt is always a separate list element. - Binary validation: the
claudebinary path is validated at construction time viashutil.which(). - API key redaction:
sk-ant-*patterns are stripped from error text.
Configuration¶
@dataclass
class ClaudeCodeConfig:
claude_path: str = "claude"
working_directory: Path | None = None
default_timeout_seconds: float = 600.0
model_timeouts: dict = {"opus": 900, "sonnet": 600, "haiku": 300}
max_retries: int = 3
base_retry_delay: float = 5.0
max_outcome_length: int = 4000
prompt_file_threshold: int = 131_072 # 128 KB
env_passthrough: list[str] = [...]
Launch Flow¶
- Record pre-launch git HEAD.
- Build command:
claude --print --model <model> --output-format json. If agent definition exists, add--system-prompt,--permission-mode,--allowedTools. - If prompt exceeds 128 KB, deliver via stdin instead of
-pflag. - Run subprocess with timeout.
- On rate-limit error (429), retry with exponential backoff (up to
max_retries). - Parse output: attempt JSON (structured), fall back to raw text.
- If agent committed code, diff pre/post HEAD to populate
files_changedandcommit_hash.
ExecutionContext¶
Source: agent_baton/core/runtime/context.py
Factory that guarantees EventBus, ExecutionEngine, and
EventPersistence are all wired to the same bus instance.
@classmethod
def build(
cls,
*,
launcher: AgentLauncher,
team_context_root: Path | None = None,
bus: EventBus | None = None,
persist_events: bool = True,
task_id: str | None = None,
) -> ExecutionContext:
The engine auto-wires EventPersistence as a bus subscriber when a bus
is provided. ExecutionContext.build() does NOT create a second
persistence instance to avoid duplicate JSONL writes.
WorkerSupervisor¶
Source: agent_baton/core/runtime/supervisor.py
Lifecycle management for daemon-mode execution. Wraps TaskWorker
with PID file management, structured logging, and graceful shutdown.
Files Managed¶
| Namespaced (with task_id) | Legacy |
|---|---|
executions/<task_id>/worker.pid |
daemon.pid |
executions/<task_id>/worker.log |
daemon.log |
executions/<task_id>/worker-status.json |
daemon-status.json |
start()¶
- Write PID file with
flock()exclusive lock (prevents double-start). - Configure rotating file logging (10 MB, 3 backups).
- Build
ExecutionContextwith launcher, bus, task_id. - Call
engine.start(plan)orengine.resume(). - Create
TaskWorkerand run viaasyncio.run(). - Install signal handlers (SIGTERM, SIGINT) for graceful shutdown.
- On completion/crash: write status snapshot, remove PID file.
Graceful Shutdown¶
async def _run_with_signals(self, worker):
handler = SignalHandler()
handler.install()
worker_task = asyncio.create_task(worker.run())
signal_task = asyncio.create_task(handler.wait())
done, pending = await asyncio.wait(
{worker_task, signal_task},
return_when=asyncio.FIRST_COMPLETED,
)
if signal_task in done:
worker_task.cancel()
await asyncio.wait_for(worker_task, timeout=30.0)
When a signal arrives, the worker task is cancelled and the supervisor waits up to 30 seconds for in-flight agents to drain.
stop()¶
Sends SIGTERM to the daemon PID and polls for exit up to a timeout.
Cleans up stale PID files.
list_workers()¶
Static method that scans all execution directories for running worker
processes. Checks liveness via os.kill(pid, 0).
Daemon Mode¶
Source: agent_baton/core/runtime/daemon.py
UNIX double-fork daemonization (daemonize()):
- First fork -- parent exits (shell returns immediately).
os.setsid()-- child becomes session leader.- Second fork -- session leader exits (grandchild cannot reacquire terminal).
- Redirect stdin/stdout/stderr to
/dev/null. Higher FDs (logging, PID flock) are preserved. - Working directory is preserved (agent-baton uses relative paths).
Must be called BEFORE asyncio.run(). Not available on Windows.
SignalHandler¶
Source: agent_baton/core/runtime/signals.py
class SignalHandler:
def install(self) # Install SIGTERM + SIGINT handlers
def uninstall(self) # Restore original handlers
async def wait(self) # Block until signal received
@property
def shutdown_requested(self) -> bool
Uses asyncio.get_running_loop().add_signal_handler() to set a
shutdown event when SIGTERM or SIGINT is received.
DecisionManager¶
Source: agent_baton/core/runtime/decisions.py
Manages human decision requests during async execution via filesystem persistence.
class DecisionManager:
def request(req: DecisionRequest) -> Path # Persist + publish event
def resolve(request_id, chosen_option, ...) # Mark resolved + publish
def get(request_id) -> DecisionRequest | None
def pending() -> list[DecisionRequest]
def list_all() -> list[DecisionRequest]
def get_resolution(request_id) -> dict | None
Each request produces:
- <request_id>.json -- machine-readable request.
- <request_id>.md -- human-readable summary with resolution
instructions (baton decide --resolve <id> --option <OPTION>).
Resolution creates <request_id>-resolution.json and publishes a
human_decision_resolved event.
10. Protocols¶
ExecutionDriver¶
Source: agent_baton/core/engine/protocols.py
The most critical contract in the system. Defines the interface between
the async TaskWorker and the synchronous ExecutionEngine.
class ExecutionDriver(Protocol):
def start(self, plan: MachinePlan) -> ExecutionAction: ...
def next_action(self) -> ExecutionAction: ...
def next_actions(self) -> list[ExecutionAction]: ...
def mark_dispatched(self, step_id: str, agent_name: str) -> None: ...
def record_step_result(
self, step_id, agent_name, status, outcome,
files_changed, commit_hash, estimated_tokens,
duration_seconds, error,
) -> None: ...
def record_gate_result(self, phase_id, passed, output) -> None: ...
def record_approval_result(self, phase_id, result, feedback) -> None: ...
def amend_plan(self, description, ...) -> PlanAmendment: ...
def record_team_member_result(self, step_id, member_id, ...) -> None: ...
def complete(self) -> str: ...
def status(self) -> dict: ...
def resume(self) -> ExecutionAction: ...
Any class implementing this protocol can serve as the engine for orchestrated execution.
AgentLauncher¶
Source: agent_baton/core/runtime/launcher.py
class AgentLauncher(Protocol):
async def launch(
self, agent_name, model, prompt, step_id
) -> LaunchResult: ...
Implementations can be Claude Code subagents, subprocess calls, API requests, or dry-run mocks.
11. Configuration¶
Engine Configuration¶
The engine configuration is primarily structural -- determined by constructor arguments:
| Parameter | Default | Description |
|---|---|---|
team_context_root |
.claude/team-context |
Root directory for all state files |
bus |
None | EventBus for domain event publishing |
task_id |
None | Namespaced execution (None = legacy flat file) |
storage |
None | SQLite backend (None = file-only mode) |
knowledge_resolver |
None | For runtime gap auto-resolution |
Planner Configuration¶
The planner's behavior is driven by data sources, not configuration flags:
| Data Source | Effect on Plans |
|---|---|
PatternLearner (usage data) |
Phase templates, agent selection |
PerformanceScorer (score data) |
Score warnings for unhealthy agents |
BudgetTuner (usage data) |
Budget tier recommendation |
DataClassifier |
Risk floor, guardrail preset |
PolicyEngine |
Agent assignment policy violations |
RetrospectiveEngine |
Agent drop/prefer recommendations |
KnowledgeRegistry |
Per-step knowledge attachments |
Launcher Configuration¶
ClaudeCodeConfig is the primary runtime configuration point:
| Field | Default | Description |
|---|---|---|
claude_path |
"claude" |
Path to claude binary |
default_timeout_seconds |
600.0 | Timeout when no model-specific override |
model_timeouts |
opus:900, sonnet:600, haiku:300 | Per-model timeouts |
max_retries |
3 | Retry attempts on rate-limit |
base_retry_delay |
5.0 | Base delay for exponential backoff |
max_outcome_length |
4000 | Max chars kept from agent outcome |
prompt_file_threshold |
131072 | Bytes above which prompt goes via stdin |
env_passthrough |
ANTHROPIC_API_KEY, etc. | Environment variables forwarded |
12. Error Handling¶
Step Failures¶
When a step fails (status="failed"):
- The StepResult is recorded with the error message.
- _determine_action() detects the failed step and returns
ActionType.FAILED.
- The execution status becomes "failed".
- A step.failed event is published.
Gate Failures¶
When a gate fails:
- record_gate_result(passed=False) sets status = "failed".
- A gate.failed event is published.
- Subsequent next_action() calls return ActionType.FAILED.
Rate-Limit Retry¶
ClaudeCodeLauncher implements automatic retry with exponential
backoff:
- Detects rate-limit responses by checking for "rate limit" or "429"
in error text.
- Retries up to max_retries times (default 3).
- Delay: base_retry_delay * 2^(attempt-1) seconds (5s, 10s, 20s).
Timeout Handling¶
- Per-model timeouts are applied via
asyncio.wait_for(). - On timeout: the subprocess is killed, and a
LaunchResultwithstatus="failed"anderror="Agent timed out after Ns"is returned.
Crash Recovery¶
The entire system is designed for crash resilience:
- Atomic writes:
StatePersistencewrites to.json.tmpthen renames, preventing partial-write corruption. - State on every mutation: the engine saves state after every
record_*call. - Resume:
baton execute resumeloads state from disk and continues from where execution left off. - Dispatched recovery:
recover_dispatched_steps()clears stale dispatched markers after a daemon crash so steps are re-dispatched. - PID file locking:
WorkerSupervisorusesflock()to prevent double-start. The lock is released automatically when the process exits.
Approval Rejection¶
When record_approval_result(result="reject"):
- Status becomes "failed".
- Execution stops. The caller must decide whether to amend and retry.
Approval With Feedback¶
When record_approval_result(result="approve-with-feedback"):
- A remediation phase is inserted after the current phase via
_amend_from_feedback().
- The phase contains a single step assigned to the first agent in the
current phase, with a task description derived from the feedback.
- Execution continues into the new remediation phase.
Knowledge Gap Escalation¶
Gaps are handled without blocking execution when possible: - auto-resolve: re-dispatch with resolved knowledge (no human intervention). - best-effort: log and continue (LOW risk, low intervention). - queue-for-gate: surface at next human approval gate (all other cases).
13. Team Collaboration¶
Agent Baton supports structured multi-agent collaboration within plan steps via team steps. A team step assigns multiple agents to a single plan step, each with a defined role and dependency ordering.
Team Step Structure¶
A PlanStep with a non-empty team list is a team step. Each
TeamMember has a member_id (e.g. "1.1.a"), role (lead,
implementer, reviewer), and optional depends_on list for intra-step
sequencing.
Synthesis Strategies¶
When all team members complete, their outputs are merged via the
step's SynthesisSpec:
| Strategy | Behavior |
|---|---|
concatenate (default) |
Join outcomes with "; ", collect all files |
merge_files |
Same as concatenate but deduplicate files_changed |
agent_synthesis |
Mark for synthesis agent dispatch (future) |
Conflict Detection and Escalation¶
The engine detects conflicts when two or more team members modify the
same file. The conflict_handling field on SynthesisSpec controls
the response:
| Mode | Behavior |
|---|---|
auto_merge (default) |
Complete the step; record conflict in retrospective |
escalate |
Pause step, set state to approval_pending, surface both positions to human |
fail |
Fail the step if conflict detected |
Conflicts are recorded as ConflictRecord instances in the
retrospective, preserving each agent's position and evidence.
Team Composition Tracking¶
After execution, TeamCompositionRecord entries are collected from
team steps and written to retrospective JSON sidecars. These feed:
PatternLearner.analyze_team_patterns()-- identifies effective team compositions across projectsPerformanceScorer.score_teams()-- aggregates team scorecardsbaton scores --teams-- CLI report of team effectivenessPatternLearner.get_team_cost_estimate()-- historical token cost by team composition
Selective MCP Pass-through¶
Team steps (and solo steps) can declare MCP server dependencies via
the mcp_servers field on PlanStep. Only declared servers are
passed to the agent subprocess via --mcp-config, avoiding input
token bloat from unused MCP tool schemas.
Default: no MCP servers inherited. This is a deliberate design choice to prevent context window waste from tool schemas agents don't need.
14. Post-Execution: Commit Consolidation¶
CommitConsolidator¶
Source: agent_baton/core/engine/consolidator.py (lazily imported)
After execution completes, the CommitConsolidator consolidates agent
commits into a clean, mergeable history via cherry-pick rebase.
Process:
- Topological sort of agent commits by dependency ordering.
- Cherry-pick each commit onto the feature branch in dependency order.
- Compute per-file diff stats and agent attribution after each pick.
- Detect conflicts and record them in the
ConsolidationResult.
ConsolidationResult (models/execution.py):
| Field | Type | Description |
|---|---|---|
status |
str | success, partial, failed |
attributions |
list[ChangelistFile] | Per-file diff stats with agent attribution |
conflicts |
list[str] | Files with merge conflicts |
error |
str | Error message if failed |
ChangelistFile (models/execution.py):
| Field | Type | Description |
|---|---|---|
path |
str | File path relative to repo root |
agent_name |
str | Agent that modified the file |
insertions |
int | Lines added |
deletions |
int | Lines removed |
status |
str | added, modified, deleted |
The ConsolidationResult is stored on ExecutionState.consolidation_result
and consumed by the PMO changelist/merge/PR API endpoints.
PlanReviewer¶
Source: agent_baton/core/engine/plan_reviewer.py
Post-generation plan quality review, wired into
IntelligentPlanner.create_plan() at step 12c.5. Two review strategies:
- Haiku review -- cheap LLM call (~2000 tokens) analyzing plan structure and returning JSON recommendations. Used for medium+ complexity plans when the Anthropic SDK is available.
- Heuristic review -- deterministic fallback using file-path clustering and task-description analysis. Catches the most common case: single-step work phases spanning 4+ files across 3+ directories.
Recommendations include: step splitting, missing dependency edges, scope imbalance warnings, and same-agent team suggestions for coupled concerns.
15. Async Sessions and Resource Management¶
Session Persistence¶
SessionState wraps ExecutionState with multi-day workflow metadata:
- Participants -- agents and humans who contributed, with contribution counts
- Checkpoints -- snapshot points for safe resumption after daemon restart or manual pause
- Lifecycle -- active → paused → resumed → completed
Multi-party Async Contributions¶
ContributionRequest extends the decision protocol for open-ended
multi-party input. Unlike DecisionRequest (binary choice), a
contribution stays open until all named contributors respond.
API: DecisionManager.request_contribution(), .contribute(),
.get_contribution(), .pending_contributions().
Events: contribution.requested and contribution.ready are
published via EventBus when all inputs arrive.
Resource Constraints¶
ResourceLimits on MachinePlan governs execution resource usage:
| Limit | Default | Enforced By |
|---|---|---|
max_concurrent_agents |
8 | StepScheduler semaphore |
max_concurrent_executions |
3 | WorkerSupervisor |
max_tokens_per_minute |
0 (unlimited) | Not yet enforced |
Token budget warnings: _check_token_budget() compares cumulative
estimated_tokens against tier thresholds (lean=50k, standard=500k,
full=2M). Warnings are logged and appended to step deviations for
retrospective tracking.
16. Intelligence and Cost Prediction¶
Team Cost Estimation¶
The planner consults PatternLearner.get_team_cost_estimate() for
team steps, surfacing historical token costs:
- In
explain_plan()-- "Team Cost Estimates" section with per-step and total estimates - In
shared_context-- budget percentage so agents are aware of resource constraints
Budget-Aware Planning¶
Team cost estimates are compared against the plan's budget_tier
threshold. The shared context includes the cost as a percentage of
budget (e.g. "~45,000 tokens (90% of lean budget)") so dispatched
agents can self-regulate scope.
Appendix: Data Models¶
Key Models (agent_baton/models/execution.py)¶
| Class | Purpose |
|---|---|
MachinePlan |
Complete execution plan (task, phases, steps, gates) |
PlanPhase |
A phase with steps and an optional gate |
PlanStep |
A single agent assignment |
PlanGate |
A QA gate definition |
TeamMember |
A member of a team step |
PlanAmendment |
Audit record of a plan modification |
ExecutionState |
Persistent state of a running execution |
StepResult |
Outcome of a step execution |
TeamStepResult |
Outcome of a team member's work |
GateResult |
Outcome of a gate check |
ApprovalResult |
Outcome of a human approval |
ExecutionAction |
Instruction from engine to driving session |
ActionType |
Enum: DISPATCH, GATE, COMPLETE, FAILED, WAIT, APPROVAL |
SynthesisSpec |
Team output merge strategy (concatenate/merge_files/agent_synthesis) |
ConsolidationResult |
Outcome of cherry-pick rebase (status, attributions, conflicts) |
ChangelistFile |
Per-file diff stats with agent attribution |
Team and Collaboration Models¶
| Class | Module | Purpose |
|---|---|---|
TeamCompositionRecord |
models/retrospective.py |
Records which agents worked as a team and outcome |
ConflictRecord |
models/retrospective.py |
Structured disagreement between agents |
TeamPattern |
models/pattern.py |
Recurring team composition pattern from usage logs |
TeamScorecard |
core/improve/scoring.py |
Performance scorecard for a team composition |
SessionState |
models/session.py |
Multi-day session wrapper with checkpoints |
SessionCheckpoint |
models/session.py |
Snapshot point for safe resumption |
SessionParticipant |
models/session.py |
Agent or human participant in a session |
ContributionRequest |
models/decision.py |
Multi-party async input collection |
ResourceLimits |
models/parallel.py |
Concurrency and token budget constraints |
Key Enums (agent_baton/models/enums.py)¶
| Enum | Values |
|---|---|
RiskLevel |
LOW, MEDIUM, HIGH, CRITICAL |
BudgetTier |
Lean (1-2), Standard (3-5), Full (6-8) |
GitStrategy |
Commit-per-agent, Branch-per-agent, None |
ExecutionMode |
Parallel Independent, Sequential Pipeline, Phased Delivery |
GateOutcome |
PASS, PASS WITH NOTES, FAIL |
Decision Models (agent_baton/models/decision.py)¶
| Class | Purpose |
|---|---|
DecisionRequest |
Human decision request (pending/resolved/expired) |
DecisionResolution |
Resolution of a decision (option + rationale) |
ContributionRequest |
Multi-party async input with contributor tracking |