Here are some example codes which show how RSocket works in Rust.
Add dependencies in your Cargo.toml
.
```toml [dependencies] tokio = "0.3.6" rsocket_rust = "0.7.0"
```
```rust use rsocketrust::prelude::*; use rsocketrust::utils::EchoRSocket; use rsocketrust::Result; use rsocketrusttransporttcp::TcpServerTransport;
async fn main() -> Result<()> { RSocketFactory::receive() .transport(TcpServerTransport::from("127.0.0.1:7878")) .acceptor(Box::new(|setup, socket| { println!("accept setup: {:?}", setup); Ok(Box::new(EchoRSocket)) // Or you can reject setup // Err(From::from("SETUPNOTALLOW")) })) .onstart(Box::new(|| println!("+++++++ echo server started! +++++++"))) .serve() .await } ```
```rust use rsocketrust::prelude::*; use rsocketrust::Result; use rsocketrusttransport_tcp::TcpClientTransport;
async fn main() -> Result<()> { let cli = RSocketFactory::connect() .transport(TcpClientTransport::from("127.0.0.1:7878")) .setup(Payload::from("READY!")) .mimetype("text/plain", "text/plain") .onclose(Box::new(|| println!("connection closed"))) .start() .await?; let req = Payload::builder() .setdatautf8("Hello World!") .setmetadatautf8("Rust") .build(); let res = cli.request_response(req).await?; println!("got: {:?}", res);
// If you want to block until socket disconnected.
cli.wait_for_close().await;
Ok(())
} ```
Example for access Redis(crates):
NOTICE: add dependency in Cargo.toml => redis = { version = "0.19.0", features = [ "aio" ] }
```rust use std::str::FromStr;
use redis::Client as RedisClient; use rsocketrust::asynctrait; use rsocketrust::prelude::*; use rsocketrust::Result;
pub struct RedisDao { inner: RedisClient, }
// Create RedisDao from str. // Example: RedisDao::from_str("redis://127.0.0.1").expect("Connect redis failed!"); impl FromStr for RedisDao { type Err = redis::RedisError;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let client = redis::Client::open(s)?;
Ok(RedisDao { inner: client })
}
}
impl RSocket for RedisDao { async fn requestresponse(&self, req: Payload) -> Resultasyncconnection().await?; let value: redis::RedisResultutf8()]) .queryasync(&mut conn) .await; match value { Ok(Some(value)) => Ok(Some(Payload::builder().setdata_utf8(&value).build())), Ok(None) => Ok(None), Err(e) => Err(e.into()), } }
async fn metadata_push(&self, _req: Payload) -> Result<()> {
todo!()
}
async fn fire_and_forget(&self, _req: Payload) -> Result<()> {
todo!()
}
fn request_stream(&self, _req: Payload) -> Flux<Result<Payload>> {
todo!()
}
fn request_channel(&self, _reqs: Flux<Result<Payload>>) -> Flux<Result<Payload>> {
todo!()
}
}
```