23 releases (5 breaking)

new 0.6.0 Jun 8, 2026
0.5.0 May 15, 2026
0.4.6 May 13, 2026
0.3.6 May 6, 2026
0.1.5 Mar 13, 2026

#24 in Procedural macros


MIT license

94KB
2K SLoC

causal-rs

Event-sourcing runtime for Rust, Kurrent-aligned.

causal-rs is an event-driven runtime with a typed Event → Reactor → Event loop, designed to run against KurrentDB (formerly EventStoreDB) as the durable event log, with Postgres as the reactor outbox + cursor store. It also runs entirely in-memory for tests.

The library's vocabulary mirrors KurrentDB's exactly where the concepts overlap (EventData / RecordedEvent, causation_id / correlation_id, StreamRevision, StreamState, $correlationId / $causationId metadata). A KurrentDB developer should be able to read the API and recognize every term.

use causal::{Engine, EngineBuilder, EventLogBackend, CheckpointStore, ReactorOutbox};
use causal::types::StreamState;
use causal::MemoryStore;
use std::sync::Arc;

let store = Arc::new(MemoryStore::new());
let engine = EngineBuilder::new(
        store.clone() as Arc<dyn EventLogBackend>,
        store.clone() as Arc<dyn CheckpointStore>,
        store.clone() as Arc<dyn ReactorOutbox>,
    )
    .with_aggregators(order_aggregators())   // Vec<Aggregator>
    .with_reactor(ShipOnPlaced)               // impl Reactor
    .build();

// Emit an event. Engine derives stream name from `Event::CATEGORY +
// Event::stream_id()`, stamps causation/correlation, persists, and
// drives downstream reactors to quiescence on `.settled()`.
engine.emit(OrderPlaced { order_id, total: 99.99 })
    .causation_id(trigger_event_id)
    .settled()
    .await?;

Status

Pre-1.0; breaking changes expected. The 2026-05-14 release finished a KurrentDB-vocabulary alignment pass (parent_idcausation_id, NewEventEventData, etc.). See CHANGELOG.md [Unreleased] for the migration matrix and docs/MIGRATION_0.4.md for the step-by-step guide.

The library is being prepared for production deployment in rootsignal on KurrentDB.

Crates

  • causal — core engine, Event trait, Reactor / Projector traits, EngineBuilder, in-memory MemoryStore backend.
  • causal_replay — durable backend implementations: PgEventLogBackend, PgReactorOutbox, PgSnapshotStore and KurrentEventLogBackend (behind feature flags), plus the cross-backend conformance suite.
  • causal_core_macros#[fact], #[aggregator], #[aggregators] proc macros.
  • causal_inspector — read-model API for an inspector UI.
  • causal_utils — internal helpers.

The roadmap calls for a hybrid backend (Option B):

Kurrent is the event store it excels at being; the queue / cursor work is inherently relational and stays on Postgres.

let kurrent = KurrentEventLogBackend::connect("esdb://localhost:2113?tls=false")?;
let pg = Arc::new(PgReactorOutbox::new(pool));

let engine = EngineBuilder::new(
    Arc::new(kurrent) as Arc<dyn EventLogBackend>,
    pg.clone()       as Arc<dyn CheckpointStore>,
    pg.clone()       as Arc<dyn ReactorOutbox>,
).build();

KurrentDB vocabulary mapping

Kurrent term causal-rs
Event (write) EventData
Event (read) RecordedEvent
event_type (stored field) composed {Event::CATEGORY}:{event.event_type()}
Category Event::CATEGORY
Stream id Event::stream_id() -> Uuid
Stream name {CATEGORY}-{stream_id} (composed automatically; causal::stream_name_for::<F>(id) exposes it)
Stream revision StreamRevision (0-indexed)
$all commit position LogCursor
StreamState for OCC causal::types::StreamState (same variants)
ExpectedRevision StreamState::StreamRevision(u64)
Persistent subscription Reactor (extends with atomic emit on top)
Group name Reactor::GROUP_NAME / Projector::GROUP_NAME
correlation_id correlation_id
causation_id causation_id
$correlationId metadata stamped automatically — server-side $by_correlation_id works natively
$causationId metadata stamped automatically — server-side $by_causation_id works natively

Deliberate divergence: Reactor vs Kurrent's PersistentSubscription — Reactor adds atomic emit on top of the subscription contract. The rest of the vocabulary is aligned 1:1.

Backend conformance

Every EventLogBackend impl runs the same suite — append idempotency on event_id, CAS via StreamState, monotonic revisions, strict-after read_stream / read_all semantics, stream isolation. Adding a new backend or trait method extends the suite once and the assertion runs against every impl. See modules/causal_replay/src/conformance.rs.

Schema

docs/schema.sql — authoritative v0.4 Postgres schema (causal_log, causal_outbox, causal_checkpoints, causal_snapshots, causal_projection_cursors). The Kurrent-alignment column renames are in migrations/20260514_kurrent_alignment.sql.

License

MIT.

Dependencies

~89–425KB
~10K SLoC