Event sourced entities in Rust.
eventsourced
: core library with EventSourced
, Entity
, EvtLog
, SnapshotStore
, etc.eventsourced-nats
: NATS implementation for eventsourced EvtLog
and SnapshotStore
eventsourced-postgres
: Postgres implementation for eventsourcedEvtLog
and SnapshotStore
EventSourced is inspired to a large degree by the excellent Akka Persistence library.
The EventSourced
trait defines types for commands, events, snapshot state and errors as well as functions for command handling, event handling and setting a snapshot state.
The EvtLog
and SnapshotStore
traits define a pluggable event log and a pluggable snapshot store respectively. For NATS and Postgres these are already provided.
The Entity
struct and its associated spawn
fuction provide for creating "running" instances of an EventSourced
implementation, identifiable by a Uuid
, for some event log and some snapshot store. Conversion of events and snapshot state to and from bytes happens via functions; for prost and serde_json these are already provided.
Before building the project and examples, please make sure you have installed the protobuf dependency that is not only needed for the optional byte conversion with prost, which is a default feature, but also for eventsourced-nats. The only way to get away without protobuf
is to change the default features and not build eventsourced-nats.
On macOS protobuf
can be installed via Homebrew:
brew install protobuf
The counter
package in the example
directory contains a simple example: a counter which handles Inc
and Dec
commands and emits/handles Increased
and Decreased
events.
```rust impl EventSourced for Counter { ...
/// Command handler, returning the to be persisted events or an error.
fn handle_cmd(&self, cmd: Self::Cmd) -> Result<Vec<Self::Evt>, Self::Error> {
match cmd {
Cmd::Inc(inc) => {
// Validate command: overflow.
if inc + self.value > u64::MAX {
Err(Error::Overflow {
value: self.value,
inc,
})
}
// Valid Inc command results in Increased event.
else {
Ok(vec![Evt {
evt: Some(evt::Evt::Increased(Increased {
old_value: self.value,
inc,
})),
}])
}
}
...
}
}
/// Event handler, returning whether to take a snapshot or not.
fn handle_evt(&mut self, seq_no: u64, evt: &Self::Evt) -> Option<Self::State> {
match evt.evt {
Some(evt::Evt::Increased(Increased { old_value, inc })) => {
self.value += inc;
debug!(seq_no, old_value, inc, value = self.value, "Increased");
}
...
}
self.snapshot_after.and_then(|snapshot_after| {
if seq_no % snapshot_after == 0 {
Some(self.value)
} else {
None
}
})
}
...
} ```
There are also the two counter-nats
and counter-postgres
packages, with a binary crate each, using eventsourced-nats
and eventsourced-postgres
respectively for the event log.
```rust ... let evtlog = evtlog.clone(); let snapshotstore = snapshotstore.clone(); let counter = Counter::default().withsnapshotafter(config.snapshotafter); let counter = Entity::spawn( id, counter, 42, evtlog, snapshot_store, convert::prost::binarizer(), ) .await .context("Cannot spawn entity")?;
tasks.spawn(async move { for n in 0..config.evtcount / 2 { if n > 0 && n % 2500 == 0 { println!("{id}: {} events persisted", n * 2); } let _ = counter .handlecmd(Cmd::Inc(n as u64)) .await .context("Cannot handle Inc command") .unwrap() .context("Invalid command") .unwrap(); let _ = counter .handlecmd(Cmd::Decrease(n as u64)) .await .context("Cannot handle Dec command") .unwrap() .context("Invalid command") .unwrap(); } }); ... ```
Take a look at the examples directory for more details.
For the counter-nats
example, nats-server needs to be installed. On macOS just use Homebrew:
brew install nats-server
Before running the example, start the nats-server with the jetstream
feature enabled:
nats-server --jetstream
Then use the following command to run the example:
RUST_LOG=info \
CONFIG_DIR=examples/counter-postgres/config \
cargo run \
--release \
--package counter-nats
Notice that you can change the configuration either by changing the defaul.yaml
file at examples/counter-nats/config
or by overriding the configuration settings with environment variables, e.g. APP__COUNTER__EVT_COUNT=42
:
RUST_LOG=info \
APP__COUNTER__EVT_COUNT=42 \
CONFIG_DIR=examples/counter-nats/config \
cargo run \
--release \
--package counter-nats
For the counter-postgres
example, PostgreSQL needs to be installed. On macOS just use Homebrew:
brew install postgresql@14
Before running the example, start PostgreSQL:
brew services run postgresql@14
Make sure you know the following connection parameters: - host - port - user - password - dbname
Change the configuration either by changing the defaul.yaml
file at examples/counter-postgres/config
or by overriding the configuration settings with environment variables, e.g. APP__EVT_LOG__DBNAME=test
or APP__COUNTER__EVT_COUNT=42
:
Then use the following command to run the example:
RUST_LOG=info \
APP__EVT_LOG__DBNAME=test \
APP__COUNTER__EVT_COUNT=42 \
CONFIG_DIR=examples/counter-postgres/config \
cargo run \
--release \
--package counter-postgres
This code is open source software licensed under the Apache 2.0 License.