Atomicity¶
Atomic groups provide ACID-like semantics for critical sections of your DAG. When a group of vertices must succeed or fail as a unit, you wrap them in an atomic group.
Overview¶
An atomic group declares a set of vertices that execute as a single all-or-nothing transaction:
atomic_groups:
payment_processing:
vertices: [reserve_funds, charge, confirm]
on_failure: rollback
no_cache: true
no_parallel: true
If any vertex in the group fails, the entire group is rolled back to its pre-execution state.
How It Works¶
Lifecycle¶
- Snapshot — Before the first vertex executes,
AtomicContextcaptures snapshots of theDataStore,StateStore, andDataLineage. - Execute — Group vertices execute according to the failure policy.
- Success — Snapshots are discarded, changes are committed via
TransactionBackend.commit(). - Failure — Snapshots are restored, exception is re-raised, and
TransactionBackend.rollback()is called.
AtomicContext¶
The AtomicContext is a context manager that wraps group execution:
from honey_badgeria.back.atomicity import AtomicContext
with AtomicContext(group, backend, data_store, state_store, lineage, logger):
# Execute group vertices
for vertex in group.vertices:
executor._execute_vertex(vertex)
# On success → backend.commit()
# On exception → backend.rollback(), snapshots restored
Configuration¶
In YAML¶
atomic_groups:
payment_processing:
vertices: [reserve_funds, charge, confirm]
on_failure: rollback # Failure policy
no_cache: true # Bypass cache in this group
no_parallel: true # Force serial execution
Failure Policies¶
| Policy | Behavior | When to Use |
|---|---|---|
rollback |
Restore DataStore, StateStore, and Lineage snapshots | Database transactions, in-memory operations |
compensate |
Execute SAGA compensating handlers in reverse order | External API calls, third-party services |
abort |
Stop execution immediately with no cleanup | Unrecoverable failures |
no_cache¶
When no_cache: true (default), vertices in the group always execute fresh, even if CACHE_ENABLED=True globally. This ensures atomic operations see current state, not stale cached results.
no_parallel¶
When no_parallel: true (default), vertices in the group execute serially, even if PARALLEL_ENABLED=True globally. This prevents race conditions in transactional workflows.
TransactionBackend¶
The TransactionBackend is an abstract interface for integrating with external transaction systems:
from honey_badgeria.back.atomicity.backends import TransactionBackend
class TransactionBackend(ABC):
def save_snapshot(self, group_name: str, data: dict) -> None: ...
def rollback(self, group_name: str, snapshot_data: dict) -> None: ...
def commit(self, group_name: str) -> None: ...
def on_enter(self, group_name: str) -> None: ...
def on_exit(self, group_name: str, success: bool) -> None: ...
InMemoryBackend¶
HBIA provides a built-in in-memory backend for testing:
from honey_badgeria.back.atomicity import InMemoryBackend
backend = InMemoryBackend()
# After execution, query what happened:
assert backend.was_committed()
assert not backend.was_rolled_back()
snapshot = backend.get_snapshot()
backend.reset()
Custom Backends¶
Implement TransactionBackend to integrate with your database or external system:
class PostgresBackend(TransactionBackend):
def __init__(self, connection):
self.conn = connection
def on_enter(self, group_name):
self.conn.execute(f"SAVEPOINT {group_name}")
def commit(self, group_name):
self.conn.execute(f"RELEASE SAVEPOINT {group_name}")
def rollback(self, group_name, snapshot_data):
self.conn.execute(f"ROLLBACK TO SAVEPOINT {group_name}")
def save_snapshot(self, group_name, data):
pass # PostgreSQL handles this via savepoints
def on_exit(self, group_name, success):
pass
Example: Payment Processing¶
flow:
process_payment:
validate_card:
handler: vertices.payment.validate_card
effect: pure
version: "1"
inputs:
card_number: str
amount: float
outputs:
card_valid: bool
next:
- reserve_funds
reserve_funds:
handler: vertices.payment.reserve
effect: side_effect
version: "1"
inputs:
card_valid: validate_card.card_valid
amount: float
outputs:
reservation_id: str
next:
- charge
charge:
handler: vertices.payment.charge
effect: side_effect
version: "1"
inputs:
reservation_id: reserve_funds.reservation_id
outputs:
transaction_id: str
next:
- confirm
confirm:
handler: vertices.payment.confirm
effect: side_effect
version: "1"
inputs:
transaction_id: charge.transaction_id
outputs:
confirmed: bool
atomic_groups:
payment_txn:
vertices: [reserve_funds, charge, confirm]
on_failure: rollback
no_cache: true
no_parallel: true
In this example:
validate_cardruns outside the atomic group (it's a pure check).reserve_funds,charge, andconfirmrun inside the atomic group.- If
chargefails,reserve_fundsresults are rolled back. - The entire group either succeeds completely or fails completely.
When Rollback Isn't Enough¶
For workflows involving external APIs where true rollback is impossible (you can't "un-call" an API), use the SAGA pattern instead: