Outbox & event flow¶
Implemented Contract
Source:
packages/shared/outbox/ · cmd/outbox-relay/ · packages/shared/events/ · doc/architecture/NATS_Stream_Config.md
GPUaaS never publishes to NATS directly from an HTTP handler. Every event is written to the outbox table in the same Postgres transaction as the domain change. The outbox-relay process polls the outbox, publishes to NATS, and marks rows published. This is the transactional outbox pattern.
Why outbox¶
Without outbox, a handler could fail in one of two unsafe ways:
flowchart LR
A[Handler writes DB] --> X1{crash before NATS}
X1 -- yes --> L1[Event lost — system inconsistent]
A --> NATS_DIRECT[Publish to NATS]
NATS_DIRECT --> X2{crash before DB commit}
X2 -- yes --> L2[Event emitted for change<br/>that never happened]
With outbox, both failures collapse to "consumer sees the event eventually, exactly when the row is committed":
flowchart LR
A[Handler<br/>BEGIN TX] --> B[Domain INSERT/UPDATE]
B --> C[INSERT outbox_events row]
C --> D[COMMIT]
D -.poll.-> OR[outbox-relay]
OR --> NATS[(NATS)]
OR --> E[UPDATE outbox status='published']
Outbox row shape¶
CREATE TABLE outbox_events (
id uuid PRIMARY KEY,
event_type text NOT NULL, -- 'provisioning.requested', etc.
subject text NOT NULL, -- NATS subject
envelope jsonb NOT NULL, -- full event envelope
correlation_id text NOT NULL,
status text NOT NULL DEFAULT 'pending',
attempt_count int NOT NULL DEFAULT 0,
last_error text NULL,
created_at timestamptz NOT NULL,
published_at timestamptz NULL
);
Envelope shape¶
Every event uses this envelope:
{
"event_id": "uuid",
"event_type": "domain.name",
"occurred_at": "RFC3339",
"version": "1.0",
"correlation_id": "uuid",
"payload": { /* type-specific */ }
}
NATS streams¶
flowchart LR
subgraph S1[PROVISIONING]
P1[provisioning.requested]
P2[provisioning.active]
P3[provisioning.failed]
P4[provisioning.releasing.requested]
P5[provisioning.releasing.completed]
P6[provisioning.release_failed]
P7[provisioning.force_release_requested]
end
subgraph S2[BILLING]
B1[billing.low_balance_warning]
B2[billing.auto_release_pending]
B3[billing.balance_depleted]
end
subgraph S3[PAYMENTS]
Y1[payments.balance_credited]
end
subgraph S4[DLQ]
D1["dlq.*"]
end
classDef stream fill:#e3f2fd,stroke:#1565c0
class S1,S2,S3,S4 stream
| Stream | Subject pattern | Retention | Consumers |
|---|---|---|---|
PROVISIONING |
provisioning.> |
Limits-based | provisioning-worker (workflow start), billing-worker (start/stop accrual), notification-relay (WS) |
BILLING |
billing.> |
Limits-based | notification-relay (WS + email), billing-worker (force-release trigger) |
PAYMENTS |
payments.> |
Limits-based | billing-worker (credit ledger), notification-relay |
DLQ |
dlq.> |
Long retention | Ops alerts |
Stream init: packages/shared/events.InitStreams() ensures streams + consumers are created at boot.
Consumer matrix¶
flowchart LR
P_REQ[provisioning.requested] --> PW[provisioning-worker]
P_ACT[provisioning.active] --> BW[billing-worker]
P_ACT --> NR[notification-relay]
P_FAIL[provisioning.failed] --> NR
P_REL[provisioning.releasing.requested] --> PW
P_RDONE[provisioning.releasing.completed] --> BW
P_RDONE --> NR
P_RFAIL[provisioning.release_failed] --> BW
P_RFAIL --> NR
P_FORCE[provisioning.force_release_requested] --> PW
B_LOW[billing.low_balance_warning] --> NR
B_AUTO[billing.auto_release_pending] --> NR
B_DEP[billing.balance_depleted] --> NR
B_DEP --> PW
Y_CR[payments.balance_credited] --> BW
Y_CR --> NR
Per-subject contract¶
| Subject | Producer | Required payload | Triggers |
|---|---|---|---|
provisioning.requested |
orchestrator | {allocation_id, capacity_shape, sku, node_id, slot_ids[]} |
provisioning-worker workflow start |
provisioning.active |
provisioning-worker | {allocation_id, ready_at} |
billing-worker start accrual; WS notify |
provisioning.failed |
provisioning-worker | {allocation_id, failure_reason} |
WS notify; metrics |
provisioning.releasing.requested |
orchestrator (user/admin) | {allocation_id, reason} |
provisioning-worker release flow |
provisioning.releasing.completed |
provisioning-worker | {allocation_id, released_at, hard_stopped} |
billing stops; WS notify |
provisioning.release_failed |
provisioning-worker | {allocation_id, error} |
billing stops; ops alert |
provisioning.force_release_requested |
billing-worker / admin | {allocation_id, reason} |
provisioning-worker emergency release |
billing.low_balance_warning |
billing-worker | {user_id, balance_minor, threshold_minor} |
WS + email |
billing.auto_release_pending |
billing-worker | {user_id, projected_depletion_at} |
WS notify |
billing.balance_depleted |
billing-worker | {user_id, balance_minor} |
force-release on active allocations |
payments.balance_credited |
webhook-worker / payments | {user_id, amount_minor, source} |
billing reconcile; WS notify |
Idempotency & dedupe¶
- Producer side: outbox row
idis the dedupe key. The relay usesFOR UPDATE SKIP LOCKEDso multiple relay replicas don't double-publish. - Consumer side: workers check
event_idagainst a processed-events table OR use Temporal workflow id derivation (workflow-{event_id}) for natural idempotency.
Sample handler pattern¶
func (s *Service) CreateRequested(ctx context.Context, in CreateRequestedInput) (*Allocation, error) {
tx, err := s.db.Begin(ctx)
if err != nil { return nil, err }
defer tx.Rollback(ctx)
// 1. reserve placement
nodeID, slotIDs, err := s.reservePlacement(ctx, tx, in)
if err != nil { return nil, err }
// 2. insert allocation
alloc, err := s.insertAllocation(ctx, tx, in, nodeID, slotIDs)
if err != nil { return nil, err }
// 3. write outbox row in same tx
if err := s.outbox.Insert(ctx, tx, outbox.Row{
EventType: "provisioning.requested",
Subject: "provisioning.requested",
CorrelationID: in.CorrelationID,
Envelope: envelope(alloc),
}); err != nil { return nil, err }
// 4. audit
if err := s.audit.Write(ctx, tx, ...); err != nil { return nil, err }
if err := tx.Commit(ctx); err != nil { return nil, err }
return alloc, nil
}
CI gates that enforce this¶
| Gate | Rule |
|---|---|
audit_mandatory_guard.sh |
Privileged mutations write audit_logs in same tx |
| Code review | Handlers never call nats.Publish directly |
| Integration tests | Outbox row exists after every domain mutation |
contracts_validate.sh |
AsyncAPI envelope matches code envelope |