Skip to content

Execution Engine

The execution engine is the runtime that takes a validated graph and executes it, resolving data bindings, managing state, and coordinating parallel/async execution.

GraphExecutor

The core orchestrator:

from honey_badgeria.back.runtime.executor import GraphExecutor

executor = GraphExecutor(
    graph,
    handler_resolver=resolver,
    cache_enabled=True,
    parallel_enabled=True,
    async_enabled=False,
    max_workers=4,
    transaction_backend=backend,
)
result = executor.execute(initial_data={"key": "value"})

Execution Pipeline

  1. Compute execution stagesGraphTopology.execution_stages() returns list[list[Vertex]] where vertices in the same stage share no dependencies.
  2. Group stages by atomicity — Consecutive stages belonging to the same atomic group are merged into execution "chunks."
  3. Execute each chunk:
  4. Atomic chunk: Wrapped in AtomicContext (snapshot → execute → commit/rollback).
  5. Regular chunk: Executed stage by stage.
  6. For each stage:
  7. Serial mode: Execute vertices one by one.
  8. Parallel mode: Execute same-stage vertices concurrently via ThreadPoolExecutor.
  9. For each vertex:
  10. Resolve inputs from DataStore via DataFlowEngine.
  11. Check cache (if enabled and vertex is pure).
  12. Execute handler function.
  13. Validate output via contract (if defined).
  14. Store outputs in DataStore with qualified names.
  15. Record lineage.
  16. Update state.

The run_flow() Shortcut

For most use cases, run_flow() is the simplest entry point:

from honey_badgeria.back.runtime import run_flow

result = run_flow(
    graph,
    flow_name="create_user",       # optional; runs full graph if None
    handlers={
        "normalize": normalize_fn,
        "validate": validate_fn,
        "save": save_fn,
    },
    initial_data={"name": "alice"},
    cache_enabled=False,
    parallel_enabled=False,
    async_enabled=False,
    max_workers=4,
    transaction_backend=None,
)

run_flow_async()

For async handlers:

result = await run_flow_async(
    graph,
    handlers={...},
    initial_data={...},
    async_enabled=True,
)

DataFlowEngine

Resolves input references and stores outputs. This is the data plumbing layer.

from honey_badgeria.back.runtime.data_flow import DataFlowEngine

engine = DataFlowEngine(data_store)

# Resolve inputs for a vertex
resolved = engine.resolve_inputs(vertex_b)
# {"x": 42, "name": "Alice"}  — resolved from data store

# Store outputs from a vertex
engine.store_outputs(vertex_a, {"id": 123, "name": "Alice"})
# Stored as: {"vertex_a.id": 123, "vertex_a.name": "Alice"}

Resolution Rules

  1. If an input value contains a . (e.g., normalize.username), it's a data binding. The engine looks up normalize.username in the DataStore.
  2. If the key is not found, a DataResolutionError is raised.
  3. If the value is a plain type name (e.g., str), it's a type declaration — skipped at runtime resolution.

Output Qualification

When a vertex produces outputs, they are stored with qualified names:

# Vertex "fetch_user" returns {"id": 123, "name": "Alice"}
# Stored in DataStore as:
#   "fetch_user.id"   → 123
#   "fetch_user.name" → "Alice"

This is what makes data bindings work. When another vertex declares inputs: {user_id: fetch_user.id}, the engine resolves fetch_user.id to 123.

DataStore

Simple key-value storage for intermediate data:

from honey_badgeria.back.runtime.data_store import DataStore

store = DataStore()

store.set_output("vertex_a.result", {"data": [1, 2, 3]})
value = store.get_output("vertex_a.result")  # {"data": [1, 2, 3]}

# Raises DataResolutionError if key doesn't exist
store.get_output("nonexistent.key")  # DataResolutionError!

# Dump all data
all_data = store.dump()  # dict[str, Any]

The DataStore is the result of execution — after run_flow() completes, the returned dict contains all outputs from all vertices.

StateStore

Tracks vertex execution state for debugging and inspection:

from honey_badgeria.back.runtime.state_store import StateStore

state = StateStore()

state.set_state("fetch_user", "pending")
state.set_state("fetch_user", "running")
state.set_state("fetch_user", "completed")

current = state.get_state("fetch_user")  # "completed"
assert state.all_completed()  # True if all vertices are "completed"

Valid states: "pending", "running", "completed", "failed".

DataLineage

Tracks data provenance — where each output came from:

from honey_badgeria.back.runtime.lineage import DataLineage

lineage = DataLineage()

lineage.record(
    output_name="vertex_a.id",
    vertex_id="vertex_a",
    inputs_used=["raw_data"],
)

# Trace the origin of any output
chain = lineage.trace("vertex_c.final")
# Returns list of transformation steps

# Dump full lineage
all_lineage = lineage.dump()

Lineage is especially useful for AI agents trying to understand how a specific output was derived — they can trace it back through the entire computation chain.

Execution Flow Diagram

                    ┌─────────────────┐
                    │   run_flow()    │
                    └────────┬────────┘
                    ┌────────▼────────┐
                    │  GraphExecutor  │
                    │   .execute()    │
                    └────────┬────────┘
              ┌──────────────▼──────────────┐
              │  GraphTopology               │
              │  .execution_stages()         │
              │  → [[A], [B,C], [D]]         │
              └──────────────┬──────────────┘
              ┌──────────────▼──────────────┐
              │  For each stage:             │
              │                              │
              │  Serial: one by one          │
              │  Parallel: ThreadPoolExecutor│
              │  Async: await coroutines     │
              └──────────────┬──────────────┘
              ┌──────────────▼──────────────┐
              │  For each vertex:            │
              │                              │
              │  1. Resolve inputs (DataFlow)│
              │  2. Check cache              │
              │  3. Execute handler          │
              │  4. Validate contract        │
              │  5. Store outputs (DataStore)│
              │  6. Record lineage           │
              │  7. Update state             │
              └──────────────┬──────────────┘
                    ┌────────▼────────┐
                    │  Return all     │
                    │  outputs (dict) │
                    └─────────────────┘