Execution Engine¶
The execution engine is the runtime that takes a validated graph and executes it, resolving data bindings, managing state, and coordinating parallel/async execution.
GraphExecutor¶
The core orchestrator:
from honey_badgeria.back.runtime.executor import GraphExecutor
executor = GraphExecutor(
graph,
handler_resolver=resolver,
cache_enabled=True,
parallel_enabled=True,
async_enabled=False,
max_workers=4,
transaction_backend=backend,
)
result = executor.execute(initial_data={"key": "value"})
Execution Pipeline¶
- Compute execution stages —
GraphTopology.execution_stages()returnslist[list[Vertex]]where vertices in the same stage share no dependencies. - Group stages by atomicity — Consecutive stages belonging to the same atomic group are merged into execution "chunks."
- Execute each chunk:
- Atomic chunk: Wrapped in
AtomicContext(snapshot → execute → commit/rollback). - Regular chunk: Executed stage by stage.
- For each stage:
- Serial mode: Execute vertices one by one.
- Parallel mode: Execute same-stage vertices concurrently via
ThreadPoolExecutor. - For each vertex:
- Resolve inputs from
DataStoreviaDataFlowEngine. - Check cache (if enabled and vertex is pure).
- Execute handler function.
- Validate output via contract (if defined).
- Store outputs in
DataStorewith qualified names. - Record lineage.
- Update state.
The run_flow() Shortcut¶
For most use cases, run_flow() is the simplest entry point:
from honey_badgeria.back.runtime import run_flow
result = run_flow(
graph,
flow_name="create_user", # optional; runs full graph if None
handlers={
"normalize": normalize_fn,
"validate": validate_fn,
"save": save_fn,
},
initial_data={"name": "alice"},
cache_enabled=False,
parallel_enabled=False,
async_enabled=False,
max_workers=4,
transaction_backend=None,
)
run_flow_async()¶
For async handlers:
DataFlowEngine¶
Resolves input references and stores outputs. This is the data plumbing layer.
from honey_badgeria.back.runtime.data_flow import DataFlowEngine
engine = DataFlowEngine(data_store)
# Resolve inputs for a vertex
resolved = engine.resolve_inputs(vertex_b)
# {"x": 42, "name": "Alice"} — resolved from data store
# Store outputs from a vertex
engine.store_outputs(vertex_a, {"id": 123, "name": "Alice"})
# Stored as: {"vertex_a.id": 123, "vertex_a.name": "Alice"}
Resolution Rules¶
- If an input value contains a
.(e.g.,normalize.username), it's a data binding. The engine looks upnormalize.usernamein the DataStore. - If the key is not found, a
DataResolutionErroris raised. - If the value is a plain type name (e.g.,
str), it's a type declaration — skipped at runtime resolution.
Output Qualification¶
When a vertex produces outputs, they are stored with qualified names:
# Vertex "fetch_user" returns {"id": 123, "name": "Alice"}
# Stored in DataStore as:
# "fetch_user.id" → 123
# "fetch_user.name" → "Alice"
This is what makes data bindings work. When another vertex declares inputs: {user_id: fetch_user.id}, the engine resolves fetch_user.id to 123.
DataStore¶
Simple key-value storage for intermediate data:
from honey_badgeria.back.runtime.data_store import DataStore
store = DataStore()
store.set_output("vertex_a.result", {"data": [1, 2, 3]})
value = store.get_output("vertex_a.result") # {"data": [1, 2, 3]}
# Raises DataResolutionError if key doesn't exist
store.get_output("nonexistent.key") # DataResolutionError!
# Dump all data
all_data = store.dump() # dict[str, Any]
The DataStore is the result of execution — after run_flow() completes, the returned dict contains all outputs from all vertices.
StateStore¶
Tracks vertex execution state for debugging and inspection:
from honey_badgeria.back.runtime.state_store import StateStore
state = StateStore()
state.set_state("fetch_user", "pending")
state.set_state("fetch_user", "running")
state.set_state("fetch_user", "completed")
current = state.get_state("fetch_user") # "completed"
assert state.all_completed() # True if all vertices are "completed"
Valid states: "pending", "running", "completed", "failed".
DataLineage¶
Tracks data provenance — where each output came from:
from honey_badgeria.back.runtime.lineage import DataLineage
lineage = DataLineage()
lineage.record(
output_name="vertex_a.id",
vertex_id="vertex_a",
inputs_used=["raw_data"],
)
# Trace the origin of any output
chain = lineage.trace("vertex_c.final")
# Returns list of transformation steps
# Dump full lineage
all_lineage = lineage.dump()
Lineage is especially useful for AI agents trying to understand how a specific output was derived — they can trace it back through the entire computation chain.
Execution Flow Diagram¶
┌─────────────────┐
│ run_flow() │
└────────┬────────┘
│
┌────────▼────────┐
│ GraphExecutor │
│ .execute() │
└────────┬────────┘
│
┌──────────────▼──────────────┐
│ GraphTopology │
│ .execution_stages() │
│ → [[A], [B,C], [D]] │
└──────────────┬──────────────┘
│
┌──────────────▼──────────────┐
│ For each stage: │
│ │
│ Serial: one by one │
│ Parallel: ThreadPoolExecutor│
│ Async: await coroutines │
└──────────────┬──────────────┘
│
┌──────────────▼──────────────┐
│ For each vertex: │
│ │
│ 1. Resolve inputs (DataFlow)│
│ 2. Check cache │
│ 3. Execute handler │
│ 4. Validate contract │
│ 5. Store outputs (DataStore)│
│ 6. Record lineage │
│ 7. Update state │
└──────────────┬──────────────┘
│
┌────────▼────────┐
│ Return all │
│ outputs (dict) │
└─────────────────┘