Yet another one Rust crate for implementing nodes for https://github.com/jepsen-io/maelstrom and solve https://fly.io/dist-sys/ challenges.
Maelstrom is a platform for learning distributed systems. It is build around Jepsen and Elle to ensure no properties are violated. With maelstrom you build nodes that form distributed system that can process different workloads.
```bash $ cargo build --examples $ maelstrom test -w echo --bin ./target/debug/examples/echo --node-count 1 --time-limit 10 --log-stderr ````
implementation:
```rust use asynctrait::asynctrait; use maelstrom::protocol::Message; use maelstrom::{Node, Result, Runtime}; use std::sync::Arc;
pub(crate) fn main() -> Result<()> { Runtime::init(try_main()) }
async fn trymain() -> Result<()> { let handler = Arc::new(Handler::default()); Runtime::new().withhandler(handler).run().await }
struct Handler {}
impl Node for Handler { async fn process(&self, runtime: Runtime, req: Message) -> Result<()> { if req.gettype() == "echo" { let echo = req.body.clone().withtype("echo_ok"); return runtime.reply(req, echo).await; }
done(runtime, message)
}
} ```
spec:
receiving
{
"src": "c1",
"dest": "n1",
"body": {
"type": "echo",
"msg_id": 1,
"echo": "Please echo 35"
}
}
send back the same msg with body.type == echo_ok.
{
"src": "n1",
"dest": "c1",
"body": {
"type": "echo_ok",
"msg_id": 1,
"in_reply_to": 1,
"echo": "Please echo 35"
}
}
```bash $ cargo build --examples $ RUST_LOG=debug maelstrom test -w broadcast --bin ./target/debug/examples/broadcast --node-count 2 --time-limit 20 --rate 10 --log-stderr ````
implementation:
```bash use asynctrait::asynctrait; use log::info; use maelstrom::protocol::{Message, MessageBody}; use maelstrom::{done, Node, Result, Runtime}; use serde::{Deserialize, Serialize}; use std::sync::{Arc, Mutex};
pub(crate) fn main() -> Result<()> { Runtime::init(try_main()) }
async fn trymain() -> Result<()> { let handler = Arc::new(Handler::default()); Runtime::new().withhandler(handler).run().await }
struct Handler {
inner: Arc
impl Node for Handler { async fn process(&self, runtime: Runtime, req: Message) -> Result<()> { match req.gettype() { "read" => { let data = self.snapshot(); let msg = ReadResponse { messages: data }; return runtime.reply(req, msg).await; } "broadcast" => { let msg = BroadcastRequest::frommessage(&req.body)?;
self.add(msg.value);
if !runtime.is_from_cluster(&req.src) {
for node in runtime.neighbours() {
runtime.spawn(Runtime::rpc(runtime.clone(), node.clone(), msg.clone()));
}
}
return runtime.reply_ok(req).await;
}
"topology" => {
info!("new topology {:?}", req.body.extra.get("topology").unwrap());
return runtime.reply_ok(req).await;
}
_ => done(runtime, req),
}
}
}
impl Handler {
fn snapshot(&self) -> Vec
fn add(&self, val: u64) {
self.inner.lock().unwrap().push(val);
}
}
struct BroadcastRequest { #[serde(default, rename = "type")] typ: String, #[serde(default, rename = "message")] value: u64, }
struct ReadResponse {
messages: Vec
impl BroadcastRequest {
fn frommessage(m: &MessageBody) -> Result
```bash $ cargo build --examples $ RUSTLOG=debug ~/Projects/maelstrom/maelstrom test -w lin-kv --bin ./target/debug/examples/linkv --node-count 4 --concurrency 2n --time-limit 10 --rate 100 --log-stderr ````
implementation:
```rust use asynctrait::asynctrait; use maelstrom::kv::{linkv, Storage, KV}; use maelstrom::protocol::Message; use maelstrom::{done, Node, Result, Runtime}; use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokiocontext::context::Context;
pub(crate) fn main() -> Result<()> { Runtime::init(try_main()) }
async fn trymain() -> Result<()> { let runtime = Runtime::new(); let handler = Arc::new(handler(runtime.clone())); runtime.withhandler(handler).run().await }
struct Handler { s: Storage, }
impl Node for Handler {
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let msg: Result
fn handler(runtime: Runtime) -> Handler { Handler { s: lin_kv(runtime) } }
enum Request { Read { key: u64, }, ReadOk { value: i64, }, Write { key: u64, value: i64, }, WriteOk {}, Cas { key: u64, from: i64, to: i64, #[serde(default, rename = "createifnot_exists")] put: bool, }, CasOk {}, } ```
```rust use asynctrait::asynctrait; use maelstrom::kv::{linkv, Storage, KV}; use maelstrom::protocol::Message; use maelstrom::{done, Node, Result, Runtime}; use tokiocontext::context::Context;
struct Handler { s: Storage, }
impl Node for Handler {
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let msg: Result
fn handler(runtime: Runtime) -> Handler { Handler { s: lin_kv(runtime) } } ```
```rust use asynctrait::asynctrait; use maelstrom::protocol::Message; use maelstrom::{Node, Result, Runtime}; use serde::{Deserialize, Serialize}; use std::sync::{Arc, Mutex};
struct Handler {}
impl Node for Handler { async fn process(&self, runtime: Runtime, req: Message) -> Result<()> { // 1. runtime.spawn(Runtime::rpc(runtime.clone(), node.clone(), msg.clone()));
// 2. put it into runtime.spawn(async move { ... }) if needed
let res: RPCResult = Runtime::rpc(runtime.clone(), node.clone(), msg.clone()).await?;
let _msg: Result<Message> = res.await;
// 3. put it into runtime.spawn(async move { ... }) if needed
let mut res: RPCResult = Runtime::rpc(runtime.clone(), node.clone(), msg.clone()).await?;
let (mut ctx, _handler) = Context::with_timeout(Duration::from_secs(1));
let _msg: Message = res.done_with(ctx).await?;
return runtime.reply_ok(req).await;
}
} ```
```rust use serde::{Deserialize, Serialize};
struct TopologyRequest {
topology: HashMap
// or
pub enum Message {
Topology {
topology: HashMap
```
```rust use asynctrait::asynctrait; use log::info; use maelstrom::protocol::Message; use maelstrom::{done, Node, Result, Runtime}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::sync::{Arc, Mutex};
struct Handler { /* ... */ }
impl Node for Handler { async fn process(&self, runtime: Runtime, req: Message) -> Result<()> { if req.gettype() == "echo" { let echo = req.body.clone().withtype("echo_ok"); return runtime.reply(req, echo).await; }
if req.get_type() == "echo" {
let echo = format!("Another echo {}", message.body.msg_id);
let msg = Value::Object(Map::from_iter([("echo".to_string(), Value::String(echo))]));
return runtime.reply(message, msg).await;
}
if req.get_type() == "echo" {
let err = maelstrom::Error::TemporarilyUnavailable {};
let body = ErrorMessageBody::from_error(err);
return runtime.reply(message, body).await;
}
if req.get_type() == "echo" {
let body = MessageBody::default().with_type("echo_ok").with_reply_to(req.body.msg_id);
// send: no response type auto-deduction and no reply_to
return runtime.send(message, body).await;
}
if req.get_type() == "echo" {
return runtime.reply(message, EchoResponse { echo: "blah".to_string() }).await;
}
if req.get_type() == "read" {
let data = self.inner.lock().unwrap().clone();
let msg = ReadResponse { messages: data };
return runtime.reply(req, msg).await;
}
if req.get_type() == "broadcast" {
let raw = Value::Object(req.body.extra.clone());
let mut msg = serde_json::from_value::<BroadcastRequest>(raw)?;
msg.typ = req.body.typ.clone();
return runtime.reply(req, msg).await;
}
if req.get_type() == "broadcast" {
let mut msg = serde_json::from_value::<BroadcastRequest>(req.body.raw())?;
msg.typ = req.body.typ.clone();
return runtime.reply(req, msg).await;
}
if req.get_type() == "broadcast" {
let mut msg = req.body.as_obj::<BroadcastRequest>()?;
msg.typ = req.body.typ.clone();
return runtime.reply(req, msg).await;
}
if req.get_type() == "topology" {
info!("new topology {:?}", req.body.extra.get("topology").unwrap());
return runtime.reply_ok(req).await;
}
done(runtime, message)
}
}
// Putting #[serde(rename = "type")] typo: String
is not necessary,
// as it is auto-deducted.
struct EchoResponse { echo: String, }
struct BroadcastRequest { #[serde(default, rename = "type")] typ: String, message: u64, }
// #[derive(Deserialize)]
// struct TopologyRequest {
// topology: HashMap
struct ReadResponse {
messages: Vec
Because I am learning Rust and I liked Maelstrom and fly.io pretty much. I wanted to play with different aspects of the language and ecosystem and build an API that will be somewhat convenient and short. I am sorry for my ego.
Thanks Aphyr and guys a lot.