Skip to content

Data Flow

Data flow is the mechanism by which data moves between vertices in a DAG. HBIA uses qualified names and data bindings to create an explicit, traceable data pipeline.

How Data Moves

When a vertex executes, these steps occur:

  1. Resolve inputs — The DataFlowEngine looks up each input binding in the DataStore.
  2. Execute handler — The resolved inputs are passed as keyword arguments to the handler function.
  3. Store outputs — The handler's return dict is stored with qualified names (e.g., vertex_name.output_key).

Example

flow:
  pipeline:
    fetch:
      handler: vertices.fetch
      outputs:
        user: dict
        email: str

    validate:
      handler: vertices.validate
      inputs:
        user: fetch.user        # data binding
        email: fetch.email      # data binding
      outputs:
        is_valid: bool

At runtime:

  1. fetch executes and returns {"user": {"id": 1}, "email": "a@b.com"}.
  2. Outputs stored as {"fetch.user": {"id": 1}, "fetch.email": "a@b.com"}.
  3. validate inputs are resolved: user={"id": 1}, email="a@b.com".
  4. validate(user={"id": 1}, email="a@b.com") is called.

Data Bindings

A data binding is an input value that references another vertex's output. The format is:

vertex_name.output_name

Detection Rule

If the input value contains a ., it's a data binding. Otherwise, it's a type declaration:

inputs:
  name: str              # Type declaration (no dot) — skipped at runtime
  email: fetch.email     # Data binding (has dot) — resolved from DataStore

Resolution

At runtime, DataFlowEngine.resolve_inputs() processes each binding:

# For vertex with inputs: {"email": "fetch.email", "name": "str"}
resolved = engine.resolve_inputs(vertex)
# Returns: {"email": "a@b.com"}  
# "name" is skipped because "str" is a type declaration, not a binding

If a binding cannot be found in the DataStore, HBIA raises DataResolutionError:

# fetch.nonexistent doesn't exist → error
DataResolutionError: "Cannot resolve input 'fetch.nonexistent'"

Output Qualification

Outputs are always stored with the vertex name prefix:

# Handler returns:
{"user_id": 123, "created": True}

# Stored as:
{"save.user_id": 123, "save.created": True}

This prevents naming collisions across vertices and makes every output globally addressable.

Initial Data

When you call run_flow(), you can pass initial data that is available to the first vertex:

result = run_flow(
    graph,
    handlers={...},
    initial_data={"username": "alice", "email": "a@b.com"},
)

Initial data is stored in the DataStore before execution begins. Entry vertices can reference it in their inputs.

Fan-Out and Fan-In

Fan-Out

Multiple vertices can read from the same output:

fetch:
  outputs:
    data: dict
  next:
    - analyze_text
    - analyze_images

analyze_text:
  inputs:
    data: fetch.data       # Both read from fetch.data

analyze_images:
  inputs:
    data: fetch.data       # Same data, different processing

Fan-In

A vertex can read from multiple upstream vertices:

combine:
  inputs:
    text_score: analyze_text.score
    image_score: analyze_images.score
  # Combines outputs from two different vertices

The Result Dictionary

After execution, run_flow() returns a dictionary containing all outputs from all vertices:

result = run_flow(graph, handlers={...}, initial_data={...})

# Access any output by qualified name
user_id = result["save.user_id"]
is_valid = result["validate.is_valid"]

This flat structure makes it easy for both humans and AI agents to inspect the complete execution result.