Skip to content

Outbox & event flow

Implemented Contract

Source: packages/shared/outbox/ · cmd/outbox-relay/ · packages/shared/events/ · doc/architecture/NATS_Stream_Config.md

GPUaaS never publishes to NATS directly from an HTTP handler. Every event is written to the outbox table in the same Postgres transaction as the domain change. The outbox-relay process polls the outbox, publishes to NATS, and marks rows published. This is the transactional outbox pattern.

Why outbox

Without outbox, a handler could fail in one of two unsafe ways:

flowchart LR
    A[Handler writes DB] --> X1{crash before NATS}
    X1 -- yes --> L1[Event lost — system inconsistent]
    A --> NATS_DIRECT[Publish to NATS]
    NATS_DIRECT --> X2{crash before DB commit}
    X2 -- yes --> L2[Event emitted for change<br/>that never happened]

With outbox, both failures collapse to "consumer sees the event eventually, exactly when the row is committed":

flowchart LR
    A[Handler<br/>BEGIN TX] --> B[Domain INSERT/UPDATE]
    B --> C[INSERT outbox_events row]
    C --> D[COMMIT]
    D -.poll.-> OR[outbox-relay]
    OR --> NATS[(NATS)]
    OR --> E[UPDATE outbox status='published']

Outbox row shape

CREATE TABLE outbox_events (
  id            uuid PRIMARY KEY,
  event_type    text NOT NULL,         -- 'provisioning.requested', etc.
  subject       text NOT NULL,         -- NATS subject
  envelope      jsonb NOT NULL,        -- full event envelope
  correlation_id text NOT NULL,
  status        text NOT NULL DEFAULT 'pending',
  attempt_count int  NOT NULL DEFAULT 0,
  last_error    text NULL,
  created_at    timestamptz NOT NULL,
  published_at  timestamptz NULL
);

Envelope shape

Every event uses this envelope:

{
  "event_id": "uuid",
  "event_type": "domain.name",
  "occurred_at": "RFC3339",
  "version": "1.0",
  "correlation_id": "uuid",
  "payload": { /* type-specific */ }
}

NATS streams

flowchart LR
    subgraph S1[PROVISIONING]
      P1[provisioning.requested]
      P2[provisioning.active]
      P3[provisioning.failed]
      P4[provisioning.releasing.requested]
      P5[provisioning.releasing.completed]
      P6[provisioning.release_failed]
      P7[provisioning.force_release_requested]
    end

    subgraph S2[BILLING]
      B1[billing.low_balance_warning]
      B2[billing.auto_release_pending]
      B3[billing.balance_depleted]
    end

    subgraph S3[PAYMENTS]
      Y1[payments.balance_credited]
    end

    subgraph S4[DLQ]
      D1["dlq.*"]
    end

    classDef stream fill:#e3f2fd,stroke:#1565c0
    class S1,S2,S3,S4 stream
Stream Subject pattern Retention Consumers
PROVISIONING provisioning.> Limits-based provisioning-worker (workflow start), billing-worker (start/stop accrual), notification-relay (WS)
BILLING billing.> Limits-based notification-relay (WS + email), billing-worker (force-release trigger)
PAYMENTS payments.> Limits-based billing-worker (credit ledger), notification-relay
DLQ dlq.> Long retention Ops alerts

Stream init: packages/shared/events.InitStreams() ensures streams + consumers are created at boot.

Consumer matrix

flowchart LR
    P_REQ[provisioning.requested] --> PW[provisioning-worker]
    P_ACT[provisioning.active]    --> BW[billing-worker]
    P_ACT                         --> NR[notification-relay]
    P_FAIL[provisioning.failed]   --> NR
    P_REL[provisioning.releasing.requested] --> PW
    P_RDONE[provisioning.releasing.completed] --> BW
    P_RDONE                        --> NR
    P_RFAIL[provisioning.release_failed] --> BW
    P_RFAIL                        --> NR
    P_FORCE[provisioning.force_release_requested] --> PW

    B_LOW[billing.low_balance_warning] --> NR
    B_AUTO[billing.auto_release_pending] --> NR
    B_DEP[billing.balance_depleted] --> NR
    B_DEP --> PW

    Y_CR[payments.balance_credited] --> BW
    Y_CR --> NR

Per-subject contract

Subject Producer Required payload Triggers
provisioning.requested orchestrator {allocation_id, capacity_shape, sku, node_id, slot_ids[]} provisioning-worker workflow start
provisioning.active provisioning-worker {allocation_id, ready_at} billing-worker start accrual; WS notify
provisioning.failed provisioning-worker {allocation_id, failure_reason} WS notify; metrics
provisioning.releasing.requested orchestrator (user/admin) {allocation_id, reason} provisioning-worker release flow
provisioning.releasing.completed provisioning-worker {allocation_id, released_at, hard_stopped} billing stops; WS notify
provisioning.release_failed provisioning-worker {allocation_id, error} billing stops; ops alert
provisioning.force_release_requested billing-worker / admin {allocation_id, reason} provisioning-worker emergency release
billing.low_balance_warning billing-worker {user_id, balance_minor, threshold_minor} WS + email
billing.auto_release_pending billing-worker {user_id, projected_depletion_at} WS notify
billing.balance_depleted billing-worker {user_id, balance_minor} force-release on active allocations
payments.balance_credited webhook-worker / payments {user_id, amount_minor, source} billing reconcile; WS notify

Idempotency & dedupe

  • Producer side: outbox row id is the dedupe key. The relay uses FOR UPDATE SKIP LOCKED so multiple relay replicas don't double-publish.
  • Consumer side: workers check event_id against a processed-events table OR use Temporal workflow id derivation (workflow-{event_id}) for natural idempotency.

Sample handler pattern

func (s *Service) CreateRequested(ctx context.Context, in CreateRequestedInput) (*Allocation, error) {
    tx, err := s.db.Begin(ctx)
    if err != nil { return nil, err }
    defer tx.Rollback(ctx)

    // 1. reserve placement
    nodeID, slotIDs, err := s.reservePlacement(ctx, tx, in)
    if err != nil { return nil, err }

    // 2. insert allocation
    alloc, err := s.insertAllocation(ctx, tx, in, nodeID, slotIDs)
    if err != nil { return nil, err }

    // 3. write outbox row in same tx
    if err := s.outbox.Insert(ctx, tx, outbox.Row{
        EventType:     "provisioning.requested",
        Subject:       "provisioning.requested",
        CorrelationID: in.CorrelationID,
        Envelope:      envelope(alloc),
    }); err != nil { return nil, err }

    // 4. audit
    if err := s.audit.Write(ctx, tx, ...); err != nil { return nil, err }

    if err := tx.Commit(ctx); err != nil { return nil, err }
    return alloc, nil
}

CI gates that enforce this

Gate Rule
audit_mandatory_guard.sh Privileged mutations write audit_logs in same tx
Code review Handlers never call nats.Publish directly
Integration tests Outbox row exists after every domain mutation
contracts_validate.sh AsyncAPI envelope matches code envelope

Where to look next