Data Flow¶
Data flow is the mechanism by which data moves between vertices in a DAG. HBIA uses qualified names and data bindings to create an explicit, traceable data pipeline.
How Data Moves¶
When a vertex executes, these steps occur:
- Resolve inputs — The
DataFlowEnginelooks up each input binding in theDataStore. - Execute handler — The resolved inputs are passed as keyword arguments to the handler function.
- Store outputs — The handler's return dict is stored with qualified names (e.g.,
vertex_name.output_key).
Example¶
flow:
pipeline:
fetch:
handler: vertices.fetch
outputs:
user: dict
email: str
validate:
handler: vertices.validate
inputs:
user: fetch.user # data binding
email: fetch.email # data binding
outputs:
is_valid: bool
At runtime:
fetchexecutes and returns{"user": {"id": 1}, "email": "a@b.com"}.- Outputs stored as
{"fetch.user": {"id": 1}, "fetch.email": "a@b.com"}. validateinputs are resolved:user={"id": 1},email="a@b.com".validate(user={"id": 1}, email="a@b.com")is called.
Data Bindings¶
A data binding is an input value that references another vertex's output. The format is:
Detection Rule¶
If the input value contains a ., it's a data binding. Otherwise, it's a type declaration:
inputs:
name: str # Type declaration (no dot) — skipped at runtime
email: fetch.email # Data binding (has dot) — resolved from DataStore
Resolution¶
At runtime, DataFlowEngine.resolve_inputs() processes each binding:
# For vertex with inputs: {"email": "fetch.email", "name": "str"}
resolved = engine.resolve_inputs(vertex)
# Returns: {"email": "a@b.com"}
# "name" is skipped because "str" is a type declaration, not a binding
If a binding cannot be found in the DataStore, HBIA raises DataResolutionError:
# fetch.nonexistent doesn't exist → error
DataResolutionError: "Cannot resolve input 'fetch.nonexistent'"
Output Qualification¶
Outputs are always stored with the vertex name prefix:
# Handler returns:
{"user_id": 123, "created": True}
# Stored as:
{"save.user_id": 123, "save.created": True}
This prevents naming collisions across vertices and makes every output globally addressable.
Initial Data¶
When you call run_flow(), you can pass initial data that is available to the first vertex:
Initial data is stored in the DataStore before execution begins. Entry vertices can reference it in their inputs.
Fan-Out and Fan-In¶
Fan-Out¶
Multiple vertices can read from the same output:
fetch:
outputs:
data: dict
next:
- analyze_text
- analyze_images
analyze_text:
inputs:
data: fetch.data # Both read from fetch.data
analyze_images:
inputs:
data: fetch.data # Same data, different processing
Fan-In¶
A vertex can read from multiple upstream vertices:
combine:
inputs:
text_score: analyze_text.score
image_score: analyze_images.score
# Combines outputs from two different vertices
The Result Dictionary¶
After execution, run_flow() returns a dictionary containing all outputs from all vertices:
result = run_flow(graph, handlers={...}, initial_data={...})
# Access any output by qualified name
user_id = result["save.user_id"]
is_valid = result["validate.is_valid"]
This flat structure makes it easy for both humans and AI agents to inspect the complete execution result.