Skip to content

Atomicity

Atomic groups provide ACID-like semantics for critical sections of your DAG. When a group of vertices must succeed or fail as a unit, you wrap them in an atomic group.

Overview

An atomic group declares a set of vertices that execute as a single all-or-nothing transaction:

atomic_groups:
  payment_processing:
    vertices: [reserve_funds, charge, confirm]
    on_failure: rollback
    no_cache: true
    no_parallel: true

If any vertex in the group fails, the entire group is rolled back to its pre-execution state.

How It Works

Lifecycle

  1. Snapshot — Before the first vertex executes, AtomicContext captures snapshots of the DataStore, StateStore, and DataLineage.
  2. Execute — Group vertices execute according to the failure policy.
  3. Success — Snapshots are discarded, changes are committed via TransactionBackend.commit().
  4. Failure — Snapshots are restored, exception is re-raised, and TransactionBackend.rollback() is called.

AtomicContext

The AtomicContext is a context manager that wraps group execution:

from honey_badgeria.back.atomicity import AtomicContext

with AtomicContext(group, backend, data_store, state_store, lineage, logger):
    # Execute group vertices
    for vertex in group.vertices:
        executor._execute_vertex(vertex)
# On success → backend.commit()
# On exception → backend.rollback(), snapshots restored

Configuration

In YAML

atomic_groups:
  payment_processing:
    vertices: [reserve_funds, charge, confirm]
    on_failure: rollback       # Failure policy
    no_cache: true             # Bypass cache in this group
    no_parallel: true          # Force serial execution

Failure Policies

Policy Behavior When to Use
rollback Restore DataStore, StateStore, and Lineage snapshots Database transactions, in-memory operations
compensate Execute SAGA compensating handlers in reverse order External API calls, third-party services
abort Stop execution immediately with no cleanup Unrecoverable failures

no_cache

When no_cache: true (default), vertices in the group always execute fresh, even if CACHE_ENABLED=True globally. This ensures atomic operations see current state, not stale cached results.

no_parallel

When no_parallel: true (default), vertices in the group execute serially, even if PARALLEL_ENABLED=True globally. This prevents race conditions in transactional workflows.

TransactionBackend

The TransactionBackend is an abstract interface for integrating with external transaction systems:

from honey_badgeria.back.atomicity.backends import TransactionBackend

class TransactionBackend(ABC):
    def save_snapshot(self, group_name: str, data: dict) -> None: ...
    def rollback(self, group_name: str, snapshot_data: dict) -> None: ...
    def commit(self, group_name: str) -> None: ...
    def on_enter(self, group_name: str) -> None: ...
    def on_exit(self, group_name: str, success: bool) -> None: ...

InMemoryBackend

HBIA provides a built-in in-memory backend for testing:

from honey_badgeria.back.atomicity import InMemoryBackend

backend = InMemoryBackend()

# After execution, query what happened:
assert backend.was_committed()
assert not backend.was_rolled_back()
snapshot = backend.get_snapshot()
backend.reset()

Custom Backends

Implement TransactionBackend to integrate with your database or external system:

class PostgresBackend(TransactionBackend):
    def __init__(self, connection):
        self.conn = connection

    def on_enter(self, group_name):
        self.conn.execute(f"SAVEPOINT {group_name}")

    def commit(self, group_name):
        self.conn.execute(f"RELEASE SAVEPOINT {group_name}")

    def rollback(self, group_name, snapshot_data):
        self.conn.execute(f"ROLLBACK TO SAVEPOINT {group_name}")

    def save_snapshot(self, group_name, data):
        pass  # PostgreSQL handles this via savepoints

    def on_exit(self, group_name, success):
        pass

Example: Payment Processing

flow:
  process_payment:
    validate_card:
      handler: vertices.payment.validate_card
      effect: pure
      version: "1"
      inputs:
        card_number: str
        amount: float
      outputs:
        card_valid: bool
      next:
        - reserve_funds

    reserve_funds:
      handler: vertices.payment.reserve
      effect: side_effect
      version: "1"
      inputs:
        card_valid: validate_card.card_valid
        amount: float
      outputs:
        reservation_id: str
      next:
        - charge

    charge:
      handler: vertices.payment.charge
      effect: side_effect
      version: "1"
      inputs:
        reservation_id: reserve_funds.reservation_id
      outputs:
        transaction_id: str
      next:
        - confirm

    confirm:
      handler: vertices.payment.confirm
      effect: side_effect
      version: "1"
      inputs:
        transaction_id: charge.transaction_id
      outputs:
        confirmed: bool

atomic_groups:
  payment_txn:
    vertices: [reserve_funds, charge, confirm]
    on_failure: rollback
    no_cache: true
    no_parallel: true

In this example:

  • validate_card runs outside the atomic group (it's a pure check).
  • reserve_funds, charge, and confirm run inside the atomic group.
  • If charge fails, reserve_funds results are rolled back.
  • The entire group either succeeds completely or fails completely.

When Rollback Isn't Enough

For workflows involving external APIs where true rollback is impossible (you can't "un-call" an API), use the SAGA pattern instead:

SAGA Pattern