23 releases (5 breaking)
| new 0.6.0 | Jun 8, 2026 |
|---|---|
| 0.5.0 | May 15, 2026 |
| 0.4.6 | May 13, 2026 |
| 0.3.6 | May 6, 2026 |
| 0.1.5 | Mar 13, 2026 |
#24 in Procedural macros
94KB
2K
SLoC
causal-rs
Event-sourcing runtime for Rust, Kurrent-aligned.
causal-rs is an event-driven runtime with a typed
Event → Reactor → Event loop, designed to run against
KurrentDB (formerly EventStoreDB) as the
durable event log, with Postgres as the reactor outbox + cursor
store. It also runs entirely in-memory for tests.
The library's vocabulary mirrors KurrentDB's exactly where the
concepts overlap (EventData / RecordedEvent, causation_id /
correlation_id, StreamRevision, StreamState,
$correlationId / $causationId metadata). A KurrentDB developer
should be able to read the API and recognize every term.
use causal::{Engine, EngineBuilder, EventLogBackend, CheckpointStore, ReactorOutbox};
use causal::types::StreamState;
use causal::MemoryStore;
use std::sync::Arc;
let store = Arc::new(MemoryStore::new());
let engine = EngineBuilder::new(
store.clone() as Arc<dyn EventLogBackend>,
store.clone() as Arc<dyn CheckpointStore>,
store.clone() as Arc<dyn ReactorOutbox>,
)
.with_aggregators(order_aggregators()) // Vec<Aggregator>
.with_reactor(ShipOnPlaced) // impl Reactor
.build();
// Emit an event. Engine derives stream name from `Event::CATEGORY +
// Event::stream_id()`, stamps causation/correlation, persists, and
// drives downstream reactors to quiescence on `.settled()`.
engine.emit(OrderPlaced { order_id, total: 99.99 })
.causation_id(trigger_event_id)
.settled()
.await?;
Status
Pre-1.0; breaking changes expected. The 2026-05-14 release
finished a KurrentDB-vocabulary alignment pass (parent_id →
causation_id, NewEvent → EventData, etc.). See
CHANGELOG.md [Unreleased] for the migration
matrix and docs/MIGRATION_0.4.md for the
step-by-step guide.
The library is being prepared for production deployment in rootsignal on KurrentDB.
Crates
causal— core engine,Eventtrait,Reactor/Projectortraits,EngineBuilder, in-memoryMemoryStorebackend.causal_replay— durable backend implementations:PgEventLogBackend,PgReactorOutbox,PgSnapshotStoreandKurrentEventLogBackend(behind feature flags), plus the cross-backend conformance suite.causal_core_macros—#[fact],#[aggregator],#[aggregators]proc macros.causal_inspector— read-model API for an inspector UI.causal_utils— internal helpers.
Recommended production shape
The roadmap calls for a hybrid backend (Option B):
- KurrentDB as the event log
(
KurrentEventLogBackend). - Postgres as the reactor outbox + cursors + snapshots
(
PgReactorOutbox,PgSnapshotStore).
Kurrent is the event store it excels at being; the queue / cursor work is inherently relational and stays on Postgres.
let kurrent = KurrentEventLogBackend::connect("esdb://localhost:2113?tls=false")?;
let pg = Arc::new(PgReactorOutbox::new(pool));
let engine = EngineBuilder::new(
Arc::new(kurrent) as Arc<dyn EventLogBackend>,
pg.clone() as Arc<dyn CheckpointStore>,
pg.clone() as Arc<dyn ReactorOutbox>,
).build();
KurrentDB vocabulary mapping
| Kurrent term | causal-rs |
|---|---|
| Event (write) | EventData |
| Event (read) | RecordedEvent |
event_type (stored field) |
composed {Event::CATEGORY}:{event.event_type()} |
| Category | Event::CATEGORY |
| Stream id | Event::stream_id() -> Uuid |
| Stream name | {CATEGORY}-{stream_id} (composed automatically; causal::stream_name_for::<F>(id) exposes it) |
| Stream revision | StreamRevision (0-indexed) |
$all commit position |
LogCursor |
StreamState for OCC |
causal::types::StreamState (same variants) |
| ExpectedRevision | StreamState::StreamRevision(u64) |
| Persistent subscription | Reactor (extends with atomic emit on top) |
| Group name | Reactor::GROUP_NAME / Projector::GROUP_NAME |
correlation_id |
correlation_id |
causation_id |
causation_id |
$correlationId metadata |
stamped automatically — server-side $by_correlation_id works natively |
$causationId metadata |
stamped automatically — server-side $by_causation_id works natively |
Deliberate divergence: Reactor vs Kurrent's PersistentSubscription
— Reactor adds atomic emit on top of the subscription contract. The
rest of the vocabulary is aligned 1:1.
Backend conformance
Every EventLogBackend impl runs the same suite — append
idempotency on event_id, CAS via StreamState, monotonic
revisions, strict-after read_stream / read_all semantics,
stream isolation. Adding a new backend or trait method extends the
suite once and the assertion runs against every impl. See
modules/causal_replay/src/conformance.rs.
Schema
docs/schema.sql — authoritative v0.4 Postgres
schema (causal_log, causal_outbox, causal_checkpoints,
causal_snapshots, causal_projection_cursors). The Kurrent-alignment
column renames are in
migrations/20260514_kurrent_alignment.sql.
License
MIT.
Dependencies
~89–425KB
~10K SLoC